summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <[email protected]>2023-09-01 11:33:07 +0300
committerauzhegov <[email protected]>2023-09-01 11:55:24 +0300
commitce4f14f3b059b1b44b3c87a82abe5144c29ce650 (patch)
treef9b2f5248e5a3583ba869635d935973344b51c02
parent804d5f57f707a59fe23a62426e5d454cb3c03115 (diff)
CreateConnection/CreateBinding improvements
-rw-r--r--ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp6
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h109
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp41
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h7
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp644
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h14
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp44
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/events/events.h5
8 files changed, 517 insertions, 353 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
index be2c5c3ea43..5524f1921ce 100644
--- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
@@ -393,8 +393,7 @@ private:
TDuration::Seconds(30),
Counters,
CommonConfig,
- Signer,
- true
+ Signer
));
}
if (Connections.empty()) {
@@ -422,8 +421,7 @@ private:
SelfId(),
request,
TDuration::Seconds(30),
- Counters,
- true
+ Counters
));
}
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
index a90e0f44256..d3573a2e680 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
@@ -19,14 +19,8 @@ using namespace NActors;
using namespace NThreading;
using namespace NYdb;
-template<typename T>
-struct TBaseActorTypeTag {
- using TRequest = typename T::TRequest;
- using TResponse = typename T::TResponse;
-};
-
template<typename TDerived>
-class TBaseActor : public NActors::TActorBootstrapped<TDerived> {
+class TPlainBaseActor : public NActors::TActorBootstrapped<TDerived> {
public:
using TBase = NActors::TActorBootstrapped<TDerived>;
using TBase::Become;
@@ -34,18 +28,14 @@ public:
using TBase::SelfId;
using TBase::Send;
- using TEventRequestPtr = typename TBaseActorTypeTag<TDerived>::TRequest::TPtr;
- using TEventResponse = typename TBaseActorTypeTag<TDerived>::TResponse;
-
public:
- TBaseActor(const TActorId& proxyActorId,
- const TEventRequestPtr request,
+ TPlainBaseActor(const TActorId& successActorId,
+ const TActorId& errorActorId,
TDuration requestTimeout,
const NPrivate::TRequestCommonCountersPtr& counters)
- : Request(std::move(request))
- , Counters(counters)
- , ProxyActorId(proxyActorId)
- , ResponseActorId(Request->Sender)
+ : Counters(counters)
+ , SuccessActorId(successActorId)
+ , ErrorActorId(errorActorId)
, RequestTimeout(requestTimeout) { }
void Bootstrap() {
@@ -55,17 +45,24 @@ public:
BootstrapImpl();
}
- void SendErrorMessageToSender(const NYql::TIssue& issue) {
- Counters->Error->Inc();
- NYql::TIssues issues;
- issues.AddIssue(issue);
- Send(ResponseActorId, new TEventResponse(issues, {}), 0, Request->Cookie);
+ template<class THandler>
+ void SendRequestToSender(TAutoPtr<THandler> handle) {
+ Counters->Ok->Inc();
+ Send(handle->Forward(SuccessActorId));
PassAway();
}
- void SendRequestToSender() {
+ void SendRequestToSender(IEventBase* event) {
Counters->Ok->Inc();
- Send(Request->Forward(ProxyActorId));
+ Send(SuccessActorId, event);
+ PassAway();
+ }
+
+ void SendErrorMessageToSender(IEventBase* event,
+ NActors::TActorIdentity::TEventFlags flags = 0,
+ ui64 cookie = 0) {
+ Counters->Error->Inc();
+ Send(ErrorActorId, event, flags, cookie);
PassAway();
}
@@ -73,9 +70,55 @@ public:
CPP_LOG_D("TBaseActor Timeout occurred. Actor id: "
<< SelfId());
Counters->Timeout->Inc();
- SendErrorMessageToSender(MakeErrorIssue(
- TIssuesIds::TIMEOUT,
- "Timeout occurred. Try repeating the request later"));
+ SendErrorMessageToSender(MakeTimeoutEventImpl(
+ MakeErrorIssue(TIssuesIds::TIMEOUT,
+ "Timeout occurred. Try repeating the request later")));
+ }
+
+protected:
+ virtual void BootstrapImpl() = 0;
+ virtual IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) = 0;
+
+protected:
+ const NPrivate::TRequestCommonCountersPtr Counters;
+ const TActorId SuccessActorId;
+ const TActorId ErrorActorId;
+ const TDuration RequestTimeout;
+};
+
+template<typename T>
+struct TBaseActorTypeTag {
+ using TRequest = typename T::TRequest;
+ using TResponse = typename T::TResponse;
+};
+
+template<typename TDerived>
+class TBaseActor : public TPlainBaseActor<TDerived> {
+public:
+ using TBase = TPlainBaseActor<TDerived>;
+ using TBase::Become;
+ using TBase::PassAway;
+ using TBase::SelfId;
+ using TBase::Send;
+
+ using TEventRequestPtr = typename TBaseActorTypeTag<TDerived>::TRequest::TPtr;
+ using TEventResponse = typename TBaseActorTypeTag<TDerived>::TResponse;
+
+public:
+ TBaseActor(const TActorId& proxyActorId,
+ const TEventRequestPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters)
+ : TPlainBaseActor<TDerived>(proxyActorId,
+ request->Sender,
+ std::move(requestTimeout),
+ counters)
+ , Request(std::move(request)) { }
+
+ void SendErrorMessageToSender(const NYql::TIssue& issue) {
+ TBase::SendErrorMessageToSender(new TEventResponse({issue}, {}),
+ 0,
+ Request->Cookie);
}
void HandleError(const TString& message, EStatus status, const NYql::TIssues& issues) {
@@ -92,15 +135,17 @@ public:
SendErrorMessageToSender(std::move(issue));
}
-private:
- virtual void BootstrapImpl() = 0;
+ void SendRequestToSender() {
+ TBase::SendRequestToSender(Request);
+ }
+
+protected:
+ IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) final {
+ return new TEventResponse({std::move(issue)}, {});
+ };
protected:
const TEventRequestPtr Request;
- const NPrivate::TRequestCommonCountersPtr Counters;
- const TActorId ProxyActorId;
- const TActorId ResponseActorId;
- const TDuration RequestTimeout;
};
} // namespace NPrivate
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
index d9666ad965c..c2de079c09e 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
@@ -132,47 +132,6 @@ TString DescribeConnectionErrorMessageFactoryMethod(const NYql::TIssues& issues)
return "Couldn't resolve connection";
};
-NActors::IActor* MakeListConnectionActor(
- const TActorId& proxyActorId,
- const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request,
- TCounters& counters,
- TDuration requestTimeout,
- TPermissions permissions) {
- auto cpsRequestFactory =
- [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event) {
- FederatedQuery::ListConnectionsRequest result;
- auto connectionName = event->Get()->Request.content().name();
- result.mutable_filter()->set_name(connectionName);
- result.set_limit(2);
- return result;
- };
- auto cpsRequestPostProcessor =
- [](TEvControlPlaneStorage::TEvListConnectionsRequest& event) {
- event.IsExactNameMatch = true;
- };
- auto entityNameExtractorFactoryMethod =
- [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event,
- const FederatedQuery::ListConnectionsResult& result) {
- event->Get()->CPSListingFinished = true;
- event->Get()->CPSConnectionCount = result.connection_size();
- };
-
- return new TControlPlaneStorageRequesterActor<
- TEvControlPlaneProxy::TEvCreateConnectionRequest,
- TEvControlPlaneProxy::TEvCreateConnectionResponse,
- TEvControlPlaneStorage::TEvListConnectionsRequest,
- TEvControlPlaneStorage::TEvListConnectionsResponse>(
- proxyActorId,
- request,
- requestTimeout,
- counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
- permissions,
- cpsRequestFactory,
- DescribeConnectionErrorMessageFactoryMethod,
- entityNameExtractorFactoryMethod,
- cpsRequestPostProcessor);
-}
-
NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request,
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
index 60c89e9f95c..86252f6e3b3 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
@@ -39,13 +39,6 @@ NActors::IActor* MakeDiscoverYDBConnectionNameActor(
TDuration requestTimeout,
TPermissions permissions);
-NActors::IActor* MakeListConnectionActor(
- const TActorId& proxyActorId,
- const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request,
- TCounters& counters,
- TDuration requestTimeout,
- TPermissions permissions);
-
/// Discover binding_name
NActors::IActor* MakeDiscoverYDBBindingNameActor(
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
index 583c0285754..64102172cf0 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/config/yq_issue.h>
#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
#include <ydb/public/api/protos/draft/fq.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
@@ -28,229 +29,427 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
using TResponse = TEventResponse;
};
+using TScheduleErrorRecoverySQLGeneration =
+ std::function<bool(NActors::TActorId sender, const TStatus& issues)>;
+
struct TSchemaQueryTask {
TString SQL;
TMaybe<TString> RollbackSQL;
+ TScheduleErrorRecoverySQLGeneration ScheduleErrorRecoverySQLGeneration;
+};
+
+struct TEvPrivate {
+ enum EEv {
+ EvProcessNextTaskRequest = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvQueryExecutionResponse,
+ EvRecoveryResponse,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE),
+ "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ struct TEvProcessNextTaskRequest :
+ NActors::TEventLocal<TEvProcessNextTaskRequest, EvProcessNextTaskRequest> { };
+
+ struct TEvQueryExecutionResponse :
+ NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> {
+ TEvQueryExecutionResponse(TStatus result)
+ : Result(std::move(result)) { }
+
+ TStatus Result;
+ };
+
+ struct TEvRecoveryResponse :
+ NActors::TEventLocal<TEvRecoveryResponse, EvRecoveryResponse> {
+ TEvRecoveryResponse(TMaybe<TString> recoverySQL, TStatus result)
+ : RecoverySQL(recoverySQL)
+ , Result(std::move(result)) { }
+
+ TMaybe<TString> RecoverySQL;
+ TStatus Result;
+ };
};
template<class TEventRequest, class TEventResponse>
class TSchemaQueryYDBActor :
public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
private:
- struct TEvPrivate {
- enum EEv {
- EvQueryExecutionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
- EvEnd
- };
-
- static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE),
- "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
-
- struct TEvQueryExecutionResponse :
- NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> {
- TStatus Result;
- size_t TaskIndex = 0u;
- bool Rollback = false;
- TMaybe<TStatus> MaybeInitialStatus;
-
- TEvQueryExecutionResponse(TStatus result,
- size_t taskIndex,
- bool rollback,
- TMaybe<TStatus> MaybeInitialStatus)
- : Result(std::move(result))
- , TaskIndex(taskIndex)
- , Rollback(rollback)
- , MaybeInitialStatus(std::move(MaybeInitialStatus)) { }
- };
- };
-
using TBase = TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>;
- using TBase::SelfId;
+ using TBase::Become;
using TBase::Request;
+ using TBase::SelfId;
using TEventRequestPtr = typename TEventRequest::TPtr;
public:
- using TTasks = std::vector<TSchemaQueryTask>;
- using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>;
- using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>;
- using TErrorMessageFactoryMethod = std::function<TString(const TStatus& status)>;
+ using TTasks = std::vector<TSchemaQueryTask>;
+ using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>;
+ using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>;
+ using TErrorMessageFactoryMethod =
+ std::function<TString(const EStatus status, const NYql::TIssues& issues)>;
TSchemaQueryYDBActor(const TActorId& proxyActorId,
const TEventRequestPtr request,
TDuration requestTimeout,
const NPrivate::TRequestCommonCountersPtr& counters,
TQueryFactoryMethod queryFactoryMethod,
- TErrorMessageFactoryMethod errorMessageFactoryMethod,
- bool successOnAlreadyExists = false)
+ TErrorMessageFactoryMethod errorMessageFactoryMethod)
: TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
proxyActorId, std::move(request), requestTimeout, counters)
, Tasks{TSchemaQueryTask{.SQL = queryFactoryMethod(Request)}}
- , ErrorMessageFactoryMethod(errorMessageFactoryMethod)
- , SuccessOnAlreadyExists(successOnAlreadyExists)
- { }
+ , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
TSchemaQueryYDBActor(const TActorId& proxyActorId,
const TEventRequestPtr request,
TDuration requestTimeout,
const NPrivate::TRequestCommonCountersPtr& counters,
TTasksFactoryMethod tasksFactoryMethod,
- TErrorMessageFactoryMethod errorMessageFactoryMethod,
- bool successOnAlreadyExists = false)
+ TErrorMessageFactoryMethod errorMessageFactoryMethod)
: TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
proxyActorId, std::move(request), requestTimeout, counters)
, Tasks(tasksFactoryMethod(Request))
- , ErrorMessageFactoryMethod(errorMessageFactoryMethod)
- , SuccessOnAlreadyExists(successOnAlreadyExists)
- { }
+ , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR";
void BootstrapImpl() override {
CPP_LOG_I("TSchemaQueryYDBActor BootstrapImpl. Actor id: " << TBase::SelfId());
- InitiateSchemaQueryExecution(0, false, Nothing());
+ ScheduleNextTask();
+ }
+
+ // Normal state
+ void HandleProcessNextTask(typename TEvPrivate::TEvProcessNextTaskRequest::TPtr& event) {
+ Y_UNUSED(event);
+ auto schemeQuery = NormalSelectTask();
+ if (schemeQuery) {
+ InitiateSchemaQueryExecution(*schemeQuery);
+ } else {
+ FinishSuccessfully();
+ }
+ }
+
+ TMaybe<TString> NormalSelectTask() {
+ if (CurrentTaskIndex < static_cast<i32>(Tasks.size())) {
+ return Tasks[CurrentTaskIndex].SQL;
+ }
+ return Nothing();
}
- TMaybe<TString> SelectTask(size_t taskIndex, bool rollback) {
- if (!rollback) {
- if (taskIndex < Tasks.size()) {
- return Tasks[taskIndex].SQL;
+ void NormalHandleExecutionResponse(
+ typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
+ const auto& executeSchemeQueryStatus = event->Get()->Result;
+
+ if (executeSchemeQueryStatus.IsSuccess()) {
+ CurrentTaskIndex++;
+ ScheduleNextTask();
+ } else {
+ SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus);
+
+ auto& task = Tasks[CurrentTaskIndex];
+ if (task.ScheduleErrorRecoverySQLGeneration &&
+ task.ScheduleErrorRecoverySQLGeneration(SelfId(),
+ executeSchemeQueryStatus)) {
+ TransitionToRecoveryState();
+ return;
}
- return Nothing();
+ TransitionToRollbackState();
+ }
+ }
+
+ // Rollback state
+ void RollbackHandleProcessNextTask(
+ typename TEvPrivate::TEvProcessNextTaskRequest::TPtr& event) {
+ Y_UNUSED(event);
+
+ auto rollbackSchemeQuery = RollbackSelectTask();
+ if (rollbackSchemeQuery) {
+ InitiateSchemaQueryExecution(*rollbackSchemeQuery);
+ } else {
+ SendError();
}
+ }
- while (true) {
- const auto& maybeRollback = Tasks[taskIndex].RollbackSQL;
+ TMaybe<TString> RollbackSelectTask() {
+ while (CurrentTaskIndex >= 0) {
+ const auto& maybeRollback = Tasks[CurrentTaskIndex].RollbackSQL;
if (maybeRollback) {
return maybeRollback;
}
- if (taskIndex == 0u) {
- return Nothing();
- }
- taskIndex--;
+ CurrentTaskIndex--;
}
+ return Nothing();
}
- bool InitiateSchemaQueryExecution(size_t taskIndex,
- bool rollback,
- const TMaybe<TStatus>& maybeInitialStatus) {
- CPP_LOG_I(
- "TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId());
- auto schemeQuery = SelectTask(taskIndex, rollback);
- if (schemeQuery) {
- CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. schemeQuery: "
- << schemeQuery);
- Request->Get()
- ->YDBClient
- ->RetryOperation([query = *schemeQuery](TSession session) {
- return session.ExecuteSchemeQuery(query);
- })
- .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(),
- self = SelfId(),
- taskIndex,
- rollback,
- maybeInitialStatus](const TAsyncStatus& future) {
- actorSystem->Send(self,
- new typename TEvPrivate::TEvQueryExecutionResponse{
- std::move(future.GetValueSync()),
- taskIndex,
- rollback,
- std::move(maybeInitialStatus)});
- });
+ void RollbackHandleExecutionResponse(
+ typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
+ const auto& executeSchemeQueryStatus = event->Get()->Result;
+
+ if (executeSchemeQueryStatus.IsSuccess()) {
+ CurrentTaskIndex--;
+ ScheduleNextTask();
+ return;
+ } else {
+ SaveIssues("Couldn't execute rollback SQL", executeSchemeQueryStatus);
+ SendError();
+ return;
+ }
+ }
+
+ // Recovery state
+ void Handle(typename TEvPrivate::TEvRecoveryResponse::TPtr& event) {
+ if (event->Get()->Result.IsSuccess()) {
+ InitiateSchemaQueryExecution(*event->Get()->RecoverySQL);
+ } else {
+ SaveIssues("Failed to generate recovery SQL", event->Get()->Result);
+ TransitionToRollbackState();
+ }
+ }
+
+ void RecoveryHandleExecutionResponse(
+ typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
+ const auto& executeSchemeQueryStatus = event->Get()->Result;
+
+ if (executeSchemeQueryStatus.IsSuccess()) {
+ ClearIssues();
+ TransitionToNormalState();
+ } else {
+ SaveIssues("Failed to execute recovery SQL", event->Get()->Result);
+ TransitionToRollbackState();
}
- return schemeQuery.Defined();
}
- STRICT_STFUNC(StateFunc,
+ // FSM description
+ STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TEvPrivate::TEvQueryExecutionResponse,
+ NormalHandleExecutionResponse);
+ hFunc(TEvPrivate::TEvProcessNextTaskRequest, HandleProcessNextTask);)
+
+ STRICT_STFUNC(
+ RollbackStateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TEvPrivate::TEvQueryExecutionResponse, RollbackHandleExecutionResponse);
+ hFunc(TEvPrivate::TEvProcessNextTaskRequest, RollbackHandleProcessNextTask);)
+
+ STRICT_STFUNC(RecoveryStateFunc,
cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
- hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle);
- )
+ hFunc(TEvPrivate::TEvRecoveryResponse, Handle);
+ hFunc(TEvPrivate::TEvQueryExecutionResponse,
+ RecoveryHandleExecutionResponse);)
+
+ void ScheduleNextTask() {
+ TBase::Send(SelfId(), new typename TEvPrivate::TEvProcessNextTaskRequest{});
+ }
+
+ void TransitionToRollbackState() {
+ CPP_LOG_I("TSchemaQueryYDBActor TransitionToRollbackState. Actor id: "
+ << TBase::SelfId());
+ CurrentTaskIndex--;
+ Become(&TSchemaQueryYDBActor::RollbackStateFunc);
+ ScheduleNextTask();
+ }
+
+ void TransitionToNormalState() {
+ CPP_LOG_I("TSchemaQueryYDBActor TransitionToNormalState. Actor id: "
+ << TBase::SelfId());
+ Become(&TSchemaQueryYDBActor::StateFunc);
+ ScheduleNextTask();
+ }
+
+ void TransitionToRecoveryState() {
+ CPP_LOG_I("TSchemaQueryYDBActor TransitionToRecoveryState. Actor id: "
+ << TBase::SelfId());
+ Become(&TSchemaQueryYDBActor::RecoveryStateFunc);
+ }
- void FinishSuccessfully(bool isAlreadyExistSuccessStatus) {
+ void FinishSuccessfully() {
CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: "
<< TBase::SelfId());
Request->Get()->ComputeYDBOperationWasPerformed = true;
- Request->Get()->ComputeYDBIsAlreadyExistFlag = isAlreadyExistSuccessStatus;
TBase::SendRequestToSender();
}
- void SendError(const TStatus& executeSchemeQueryStatus) {
+ void SendError() {
CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: "
<< TBase::SelfId());
- TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus);
+ TString errorMessage = ErrorMessageFactoryMethod(*FirstStatus, Issues);
- TBase::HandleError(errorMessage,
- executeSchemeQueryStatus.GetStatus(),
- executeSchemeQueryStatus.GetIssues());
+ TBase::HandleError(errorMessage, *FirstStatus, std::move(Issues));
}
+ void SaveIssues(const TString& message, const TStatus& status) {
+ auto issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, message);
+ for (const auto& subIssue : status.GetIssues()) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
+ }
+ Issues.AddIssue(std::move(issue));
+ if (!FirstStatus) {
+ FirstStatus = status.GetStatus();
+ }
+ }
- void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
- const auto& executeSchemeQueryStatus = event->Get()->Result;
- auto isRollback = event->Get()->Rollback;
- auto isAlreadyExistSuccessStatus =
- IsAlreadyExistSuccessStatus(executeSchemeQueryStatus);
- auto isExecuteStatementSuccessful =
- executeSchemeQueryStatus.IsSuccess() || isAlreadyExistSuccessStatus;
- auto successExecutionRunMode = isExecuteStatementSuccessful && !isRollback;
- auto successExecutionRollbackMode = isExecuteStatementSuccessful && isRollback;
- auto failedExecutionRunMode = !isExecuteStatementSuccessful && !isRollback;
-
- if (successExecutionRunMode) {
- if (!InitiateSchemaQueryExecution(event->Get()->TaskIndex + 1, false, Nothing())) {
- FinishSuccessfully(isAlreadyExistSuccessStatus);
- return;
- }
- } else if (successExecutionRollbackMode) {
- if (event->Get()->TaskIndex == 0 ||
- !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1,
- true,
- std::move(event->Get()->MaybeInitialStatus))) {
- SendError(*event->Get()->MaybeInitialStatus);
- return;
- }
- } else if (failedExecutionRunMode) {
- if (event->Get()->TaskIndex == 0 ||
- !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1,
- true,
- std::move(event->Get()->Result))) {
- SendError(event->Get()->Result);
- return;
- }
- } else {
- // Failed during rollback
- const auto& initialIssues = *(event->Get()->MaybeInitialStatus);
+ void ClearIssues() {
+ Issues.Clear();
+ FirstStatus.Clear();
+ }
- auto originalIssue =
- MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't execute SQL script");
- for (const auto& subIssue : initialIssues.GetIssues()) {
- originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- auto rollbackIssue =
- MakeErrorIssue(TIssuesIds::INTERNAL_ERROR,
- "Couldn't execute rollback SQL script");
- for (const auto& subIssue : event->Get()->Result.GetIssues()) {
- originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- SendError(TStatus{initialIssues.GetStatus(),
- NYql::TIssues{std::move(originalIssue),
- std::move(rollbackIssue)}});
- return;
+ void InitiateSchemaQueryExecution(const TString& schemeQuery) {
+ CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. Actor id: "
+ << TBase::SelfId() << " SchemeQuery: " << schemeQuery);
+ Request->Get()
+ ->YDBClient
+ ->RetryOperation([query = schemeQuery](TSession session) {
+ return session.ExecuteSchemeQuery(query);
+ })
+ .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(),
+ self = SelfId()](const TAsyncStatus& future) {
+ actorSystem->Send(self,
+ new typename TEvPrivate::TEvQueryExecutionResponse{
+ std::move(future.GetValueSync()),
+ });
+ });
+ }
+
+private:
+ TTasks Tasks;
+ TErrorMessageFactoryMethod ErrorMessageFactoryMethod;
+ i32 CurrentTaskIndex = 0;
+ TMaybe<EStatus> FirstStatus;
+ NYql::TIssues Issues;
+};
+
+class TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor :
+ public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor> {
+public:
+ using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>;
+
+ TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor(
+ NActors::TActorId sender,
+ const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request,
+ TPermissions permissions,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters)
+ : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>(
+ sender, sender, requestTimeout, counters)
+ , Request(request)
+ , Permissions(std::move(permissions)) { }
+
+ void BootstrapImpl() { CheckConnectionExistenceInCPS(); }
+
+ IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) {
+ return new TEvPrivate::TEvRecoveryResponse(
+ Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}});
+ };
+
+ void CheckConnectionExistenceInCPS() {
+ FederatedQuery::ListConnectionsRequest result;
+ auto connectionName = Request->Get()->Request.content().name();
+ result.mutable_filter()->set_name(connectionName);
+ result.set_limit(2);
+
+ auto event =
+ new TEvControlPlaneStorage::TEvListConnectionsRequest(Request->Get()->Scope,
+ result,
+ Request->Get()->User,
+ Request->Get()->Token,
+ Request->Get()->CloudId,
+ Permissions,
+ Request->Get()->Quotas,
+ Request->Get()->TenantInfo,
+ {});
+
+ event->IsExactNameMatch = true;
+
+ TBase::Send(NFq::ControlPlaneStorageServiceActorId(), event);
+ }
+
+ STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TEvControlPlaneStorage::TEvListConnectionsResponse, Handle);)
+
+ void Handle(const TEvControlPlaneStorage::TEvListConnectionsResponse::TPtr& event) {
+ auto connectionSize = event->Get()->Result.connection_size();
+ if (connectionSize != 0) {
+ // Already exist in CPS
+ TBase::SendErrorMessageToSender(
+ new TEvPrivate::TEvRecoveryResponse(Nothing(),
+ TStatus{EStatus::TIMEOUT, {}}));
}
+ TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse(
+ MakeDeleteExternalDataSourceQuery(Request->Get()->Request.content().name()),
+ TStatus{EStatus::SUCCESS, {}}));
}
private:
- bool IsAlreadyExistSuccessStatus(const TStatus& status) const {
- return SuccessOnAlreadyExists &&
- (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
- status.GetIssues().ToOneLineString().Contains("error: path exist"));
+ NActors::TActorId Sender;
+ const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& Request;
+ TPermissions Permissions;
+};
+
+class TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor :
+ public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor> {
+public:
+ using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>;
+
+ TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor(
+ NActors::TActorId sender,
+ const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request,
+ TPermissions permissions,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters)
+ : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>(
+ sender, sender, requestTimeout, counters)
+ , Request(request)
+ , Permissions(std::move(permissions)) { }
+
+ void BootstrapImpl() { CheckBindingExistenceInCPS(); }
+
+ IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) {
+ return new TEvPrivate::TEvRecoveryResponse(
+ Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}});
+ };
+
+ void CheckBindingExistenceInCPS() {
+ FederatedQuery::ListBindingsRequest result;
+ auto bindingName = Request->Get()->Request.content().name();
+ result.mutable_filter()->set_name(bindingName);
+ result.set_limit(2);
+
+ auto event =
+ new TEvControlPlaneStorage::TEvListBindingsRequest(Request->Get()->Scope,
+ result,
+ Request->Get()->User,
+ Request->Get()->Token,
+ Request->Get()->CloudId,
+ Permissions,
+ Request->Get()->Quotas,
+ Request->Get()->TenantInfo,
+ {});
+
+ event->IsExactNameMatch = true;
+
+ TBase::Send(NFq::ControlPlaneStorageServiceActorId(), event);
+ }
+
+ STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TEvControlPlaneStorage::TEvListBindingsResponse, Handle);)
+
+ void Handle(const TEvControlPlaneStorage::TEvListBindingsResponse::TPtr& event) {
+ auto bindingSize = event->Get()->Result.binding_size();
+ if (bindingSize != 0) {
+ // Already exist in CPS
+ TBase::SendErrorMessageToSender(
+ new TEvPrivate::TEvRecoveryResponse(Nothing(),
+ TStatus{EStatus::TIMEOUT, {}}));
+ }
+ TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse(
+ MakeDeleteExternalDataTableQuery(Request->Get()->Request.content().name()),
+ TStatus{EStatus::SUCCESS, {}}));
}
private:
- TTasks Tasks;
- TErrorMessageFactoryMethod ErrorMessageFactoryMethod;
- bool SuccessOnAlreadyExists = false;
+ NActors::TActorId Sender;
+ const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& Request;
+ TPermissions Permissions;
};
/// Connection actors
@@ -260,12 +459,12 @@ NActors::IActor* MakeCreateConnectionActor(
TDuration requestTimeout,
TCounters& counters,
const NConfig::TCommonConfig& commonConfig,
- TSigner::TPtr signer,
- bool successOnAlreadyExists) {
+ TSigner::TPtr signer) {
auto queryFactoryMethod =
[objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
- signer = std::move(signer)](
- const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
+ signer = std::move(signer),
+ requestTimeout,
+ &counters](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
auto& connectionContent = request->Get()->Request.content();
@@ -283,14 +482,37 @@ NActors::IActor* MakeCreateConnectionActor(
connectionContent.name(),
signer)});
}
+
+ TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
+ [&request, requestTimeout, &counters](NActors::TActorId sender,
+ const TStatus& status) {
+ Cerr << "Status 1 " << status.GetIssues().ToOneLineString() << Endl;
+ if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
+ status.GetIssues().ToOneLineString().Contains("error: path exist")) {
+ TActivationContext::ActorSystem()->Register(
+ new TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor(
+ sender,
+ request,
+ TPermissions{},
+ requestTimeout,
+ counters.GetCommonCounters(
+ RTC_CREATE_CONNECTION_IN_YDB))); // change counter
+ return true;
+ }
+ return false;
+ };
statements.push_back(
TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
- connectionContent, objectStorageEndpoint, signer)}});
+ connectionContent, objectStorageEndpoint, signer)},
+ .ScheduleErrorRecoverySQLGeneration =
+ alreadyExistRecoveryActorFactoryMethod});
return statements;
};
- auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
+ auto errorMessageFactoryMethod = [](const EStatus status,
+ const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(issues);
+ if (status == NYdb::EStatus::ALREADY_EXISTS) {
return "External data source with such name already exists";
} else {
return "Couldn't create external data source in YDB";
@@ -304,8 +526,7 @@ NActors::IActor* MakeCreateConnectionActor(
requestTimeout,
counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB),
queryFactoryMethod,
- errorMessageFactoryMethod,
- successOnAlreadyExists);
+ errorMessageFactoryMethod);
}
NActors::IActor* MakeModifyConnectionActor(
@@ -405,8 +626,10 @@ NActors::IActor* MakeModifyConnectionActor(
return statements;
};
- auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- Y_UNUSED(queryStatus);
+ auto errorMessageFactoryMethod = [](const EStatus status,
+ const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(status);
+ Y_UNUSED(issues);
return "Couldn't modify external data source in YDB";
};
@@ -455,8 +678,10 @@ NActors::IActor* MakeDeleteConnectionActor(
return statements;
};
- auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- Y_UNUSED(queryStatus);
+ auto errorMessageFactoryMethod = [](const EStatus status,
+ const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(status);
+ Y_UNUSED(issues);
return "Couldn't delete external data source in YDB";
};
@@ -470,55 +695,49 @@ NActors::IActor* MakeDeleteConnectionActor(
errorMessageFactoryMethod);
}
-NActors::IActor* MakeDropCreateConnectionActor(
- const TActorId& proxyActorId,
- TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
- TDuration requestTimeout,
- TCounters& counters,
- const NConfig::TCommonConfig& commonConfig,
- TSigner::TPtr signer) {
- auto queryFactoryMethod =
- [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
- signer = std::move(signer)](
- const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
- -> std::vector<TSchemaQueryTask> {
- auto& connectionContent = request->Get()->Request.content();
- return {TSchemaQueryTask{.SQL = TString{MakeDeleteExternalDataSourceQuery(
- connectionContent.name())}},
- TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
- connectionContent, objectStorageEndpoint, signer)}}};
- };
- auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- Y_UNUSED(queryStatus);
- return "Couldn't recreate external source in YDB";
- };
-
- return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest,
- TEvControlPlaneProxy::TEvCreateConnectionResponse>(
- proxyActorId,
- std::move(request),
- requestTimeout,
- counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB),
- queryFactoryMethod,
- errorMessageFactoryMethod);
-}
-
/// Bindings actors
NActors::IActor* MakeCreateBindingActor(
const TActorId& proxyActorId,
TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
TDuration requestTimeout,
- TCounters& counters,
- bool successOnAlreadyExists) {
+ TCounters& counters) {
auto queryFactoryMethod =
- [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) -> TString {
- auto externalSourceName = *request->Get()->ConnectionName;
- return MakeCreateExternalDataTableQuery(request->Get()->Request.content(),
- externalSourceName);
+ [requestTimeout,
+ &counters](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request)
+ -> std::vector<TSchemaQueryTask> {
+ auto& bindingContent = request->Get()->Request.content();
+ auto& externalSourceName = *request->Get()->ConnectionName;
+ std::vector<TSchemaQueryTask> statements;
+
+ TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
+ [&request, requestTimeout, &counters](NActors::TActorId sender,
+ const TStatus& status) {
+ Cerr << "Status 1 " << status.GetIssues().ToOneLineString() << Endl;
+ if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
+ status.GetIssues().ToOneLineString().Contains("error: path exist")) {
+ TActivationContext::ActorSystem()->Register(
+ new TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor(
+ sender,
+ request,
+ TPermissions{},
+ requestTimeout,
+ counters.GetCommonCounters(
+ RTC_CREATE_CONNECTION_IN_YDB))); // change counter
+ return true;
+ }
+ return false;
+ };
+ statements.push_back(TSchemaQueryTask{
+ .SQL = TString{MakeCreateExternalDataTableQuery(bindingContent,
+ externalSourceName)},
+ .ScheduleErrorRecoverySQLGeneration = alreadyExistRecoveryActorFactoryMethod});
+ return statements;
};
- auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
+ auto errorMessageFactoryMethod = [](const EStatus status,
+ const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(issues);
+ if (status == NYdb::EStatus::ALREADY_EXISTS) {
return "External data table with such name already exists";
} else {
return "Couldn't create external data table in YDB";
@@ -532,8 +751,7 @@ NActors::IActor* MakeCreateBindingActor(
requestTimeout,
counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB),
queryFactoryMethod,
- errorMessageFactoryMethod,
- successOnAlreadyExists);
+ errorMessageFactoryMethod);
}
NActors::IActor* MakeModifyBindingActor(
@@ -542,13 +760,15 @@ NActors::IActor* MakeModifyBindingActor(
TDuration requestTimeout,
TCounters& counters) {
auto queryFactoryMethod =
- [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> std::vector<TSchemaQueryTask> {
+ [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request)
+ -> std::vector<TSchemaQueryTask> {
auto sourceName = *request->Get()->ConnectionName;
auto oldTableName = request->Get()->OldBindingContent->name();
auto deleteOldEntities = MakeDeleteExternalDataTableQuery(oldTableName);
auto createOldEntities =
- MakeCreateExternalDataTableQuery(*request->Get()->OldBindingContent, sourceName);
+ MakeCreateExternalDataTableQuery(*request->Get()->OldBindingContent,
+ sourceName);
auto createNewEntities =
MakeCreateExternalDataTableQuery(request->Get()->Request.content(), sourceName);
@@ -556,8 +776,10 @@ NActors::IActor* MakeModifyBindingActor(
TSchemaQueryTask{.SQL = createNewEntities}};
};
- auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- Y_UNUSED(queryStatus);
+ auto errorMessageFactoryMethod = [](const EStatus status,
+ const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(status);
+ Y_UNUSED(issues);
return "Couldn't modify external data table in YDB";
};
@@ -581,8 +803,10 @@ NActors::IActor* MakeDeleteBindingActor(
return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName);
};
- auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- Y_UNUSED(queryStatus);
+ auto errorMessageFactoryMethod = [](const EStatus status,
+ const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(status);
+ Y_UNUSED(issues);
return "Couldn't delete external data source in YDB";
};
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
index 0df17eca66d..c8ef2449e16 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
@@ -17,8 +17,7 @@ NActors::IActor* MakeCreateConnectionActor(
TDuration requestTimeout,
TCounters& counters,
const NConfig::TCommonConfig& commonConfig,
- TSigner::TPtr signer,
- bool successOnAlreadyExists = false);
+ TSigner::TPtr signer);
NActors::IActor* MakeModifyConnectionActor(
const NActors::TActorId& proxyActorId,
@@ -36,21 +35,12 @@ NActors::IActor* MakeDeleteConnectionActor(
const NConfig::TCommonConfig& commonConfig,
TSigner::TPtr signer);
-NActors::IActor* MakeDropCreateConnectionActor(
- const NActors::TActorId& proxyActorId,
- TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
- TDuration requestTimeout,
- TCounters& counters,
- const NConfig::TCommonConfig& commonConfig,
- TSigner::TPtr signer);
-
/// Binding manipulation actors
NActors::IActor* MakeCreateBindingActor(
const NActors::TActorId& proxyActorId,
TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
TDuration requestTimeout,
- TCounters& counters,
- bool successOnAlreadyExists = false);
+ TCounters& counters);
NActors::IActor* MakeModifyBindingActor(
const NActors::TActorId& proxyActorId,
diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
index 3022997a06a..05f5cf39a4d 100644
--- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -1350,53 +1350,13 @@ private:
CredentialsProviderFactory);
}
- if (!ev->Get()->ComputeYDBOperationWasPerformed && !ev->Get()->ComputeYDBIsAlreadyExistFlag) {
+ if (!ev->Get()->ComputeYDBOperationWasPerformed) {
Register(NPrivate::MakeCreateConnectionActor(ControlPlaneProxyActorId(),
std::move(ev),
Config.RequestTimeout,
Counters,
Config.CommonConfig,
- Signer,
- true));
- return;
- }
-
- if (ev->Get()->ComputeYDBIsAlreadyExistFlag && !ev->Get()->CPSListingFinished) {
- ev->Get()->ComputeYDBOperationWasPerformed = false;
- Register(MakeListConnectionActor(ControlPlaneProxyActorId(),
- std::move(ev),
- Counters,
- Config.RequestTimeout,
- availablePermissions));
- return;
- }
-
- if (ev->Get()->ComputeYDBIsAlreadyExistFlag &&
- !ev->Get()->ComputeYDBOperationWasPerformed) {
- if (ev->Get()->CPSConnectionCount != 0) {
- CPS_LOG_E("CreateConnectionRequest, Connection with such name already exists: "
- << scope << " " << user << " " << NKikimr::MaskTicket(token)
- << " " << request.DebugString());
- Send(ev->Sender,
- new TEvControlPlaneProxy::TEvCreateConnectionResponse(
- NYql::TIssues{
- NYql::TIssue{"Connection with such name already exists"}},
- subjectType),
- 0,
- ev->Cookie);
- requestCounters.IncError();
- TDuration delta = TInstant::Now() - startTime;
- requestCounters.Common->LatencyMs->Collect(delta.MilliSeconds());
- probe(delta, false, false);
- return;
- }
-
- Register(NPrivate::MakeDropCreateConnectionActor(ControlPlaneProxyActorId(),
- std::move(ev),
- Config.RequestTimeout,
- Counters,
- Config.CommonConfig,
- Signer));
+ Signer));
return;
}
}
diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h
index ebd103ae226..54f49d06d07 100644
--- a/ydb/core/fq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h
@@ -312,11 +312,6 @@ struct TEvControlPlaneProxy {
EvCreateConnectionRequest>::TBaseControlPlaneRequest;
bool ComputeYDBIsAlreadyExistFlag = false;
- // Check that connection does not exist in CPS
- bool CPSListingFinished = false;
- size_t CPSConnectionCount = 0;
- // ??
- bool IsInDeleteCreateState = false;
};
template<>