aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Uzhegov <auzhegov@ydb.tech>2024-02-16 18:36:51 +0300
committerGitHub <noreply@github.com>2024-02-16 18:36:51 +0300
commit5e1e046209a6c2b6e63ed498b1f82e61079e227b (patch)
tree325d59000c1bb56c4fb376808731733f3325546e
parent72c965e55695d8ea52923a97f4946fb8353d9538 (diff)
downloadydb-5e1e046209a6c2b6e63ed498b1f82e61079e227b.tar.gz
[YQ-2728] spurious empty TMaybe fix (#1984)
-rw-r--r--ydb/core/fq/libs/common/util.h2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h22
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h15
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp370
4 files changed, 223 insertions, 186 deletions
diff --git a/ydb/core/fq/libs/common/util.h b/ydb/core/fq/libs/common/util.h
index 86664504d5..658d8e5693 100644
--- a/ydb/core/fq/libs/common/util.h
+++ b/ydb/core/fq/libs/common/util.h
@@ -76,6 +76,6 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection);
TString RemoveDatabaseFromStr(TString str, const TString& substr);
-NYql::TIssues RemoveDatabaseFromIssues(const NYql::TIssues& issues, const TString& str);
+NYql::TIssues RemoveDatabaseFromIssues(const NYql::TIssues& issues, const TString& databasePath);
} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
index 871f54ed7d..e7d68a5e51 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h
@@ -12,30 +12,29 @@
#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
-namespace NFq {
-namespace NPrivate {
+namespace NFq::NPrivate {
using namespace NActors;
using namespace NThreading;
using namespace NYdb;
template<typename TDerived>
-class TPlainBaseActor : public NActors::TActorBootstrapped<TDerived> {
+class TPlainBaseActor : public TActorBootstrapped<TDerived> {
public:
- using TBase = NActors::TActorBootstrapped<TDerived>;
+ using TBase = TActorBootstrapped<TDerived>;
using TBase::Become;
using TBase::SelfId;
using TBase::Send;
public:
TPlainBaseActor(const TActorId& successActorId,
- const TActorId& errorActorId,
- TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters)
+ const TActorId& errorActorId,
+ TDuration requestTimeout,
+ const TRequestCommonCountersPtr& counters)
: Counters(counters)
, SuccessActorId(successActorId)
, ErrorActorId(errorActorId)
- , RequestTimeout(requestTimeout) { }
+ , RequestTimeout(std::move(requestTimeout)) { }
void Bootstrap() {
CPP_LOG_T("TBaseActor Bootstrap started. Actor id: " << SelfId());
@@ -84,7 +83,7 @@ protected:
virtual IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) = 0;
protected:
- const NPrivate::TRequestCommonCountersPtr Counters;
+ const TRequestCommonCountersPtr Counters;
const TActorId SuccessActorId;
const TActorId ErrorActorId;
const TDuration RequestTimeout;
@@ -112,7 +111,7 @@ public:
TBaseActor(const TActorId& proxyActorId,
const TEventRequestPtr request,
TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters)
+ const TRequestCommonCountersPtr& counters)
: TPlainBaseActor<TDerived>(proxyActorId,
request->Sender,
std::move(requestTimeout),
@@ -152,5 +151,4 @@ protected:
const TEventRequestPtr Request;
};
-} // namespace NPrivate
-} // namespace NFq
+} // namespace NFq::NPrivate
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h
index 7be05ece0d..05db07fc50 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h
@@ -12,16 +12,14 @@
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
-namespace NFq {
-namespace NPrivate {
+namespace NFq::NPrivate {
template<class TRequestProto, class TRequest, class TResponse, class TRequestProxy, class TResponseProxy>
class TRequestActor :
- public NActors::TActorBootstrapped<
+ public TActorBootstrapped<
TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>> {
protected:
- using TBase = NActors::TActorBootstrapped<
- TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>>;
+ using TBase = TActorBootstrapped<TRequestActor>;
using TBase::SelfId;
using TBase::Send;
using TBase::PassAway;
@@ -29,7 +27,7 @@ protected:
using TBase::Schedule;
typename TRequestProxy::TPtr RequestProxy;
- ::NFq::TControlPlaneProxyConfig Config;
+ TControlPlaneProxyConfig Config;
TActorId ServiceId;
TRequestCounters Counters;
TInstant StartTime;
@@ -42,7 +40,7 @@ public:
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR";
explicit TRequestActor(typename TRequestProxy::TPtr requestProxy,
- const ::NFq::TControlPlaneProxyConfig& config,
+ const TControlPlaneProxyConfig& config,
const TActorId& serviceId,
const TRequestCounters& counters,
const std::function<void(const TDuration&, bool, bool)>& probe,
@@ -255,5 +253,4 @@ public:
}
};
-}
-}
+} // namespace NFq::NPrivate
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
index bde41a058b..2854598fab 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -11,8 +11,7 @@
#include <ydb/public/api/protos/draft/fq.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
-namespace NFq {
-namespace NPrivate {
+namespace NFq::NPrivate {
using namespace NActors;
using namespace ::NFq::NConfig;
@@ -37,16 +36,16 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
namespace {
using TScheduleErrorRecoverySQLGeneration =
- std::function<bool(NActors::TActorId sender, const TStatus& issues)>;
+ std::function<bool(TActorId sender, const TStatus& issues)>;
using TShouldSkipStepOnError =
std::function<bool(const TStatus& issues)>;
-inline TScheduleErrorRecoverySQLGeneration NoRecoverySQLGeneration() {
+TScheduleErrorRecoverySQLGeneration NoRecoverySQLGeneration() {
return TScheduleErrorRecoverySQLGeneration{};
}
-inline TShouldSkipStepOnError NoSkipOnError() {
+TShouldSkipStepOnError NoSkipOnError() {
return TShouldSkipStepOnError{};
}
@@ -69,20 +68,19 @@ struct TEvPrivate {
"expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
struct TEvProcessNextTaskRequest :
- NActors::TEventLocal<TEvProcessNextTaskRequest, EvProcessNextTaskRequest> { };
+ TEventLocal<TEvProcessNextTaskRequest, EvProcessNextTaskRequest> { };
struct TEvQueryExecutionResponse :
- NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> {
- TEvQueryExecutionResponse(TStatus result)
+ TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> {
+ explicit TEvQueryExecutionResponse(TStatus result)
: Result(std::move(result)) { }
TStatus Result;
};
- struct TEvRecoveryResponse :
- NActors::TEventLocal<TEvRecoveryResponse, EvRecoveryResponse> {
+ struct TEvRecoveryResponse : TEventLocal<TEvRecoveryResponse, EvRecoveryResponse> {
TEvRecoveryResponse(TMaybe<TString> recoverySQL, TStatus result)
- : RecoverySQL(recoverySQL)
+ : RecoverySQL(std::move(recoverySQL))
, Result(std::move(result)) { }
TMaybe<TString> RecoverySQL;
@@ -93,8 +91,8 @@ struct TEvPrivate {
template<class TEventRequest, class TEventResponse>
class TSchemaQueryYDBActor :
public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
-private:
- using TBase = TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>;
+
+ using TBase = TBaseActor<TSchemaQueryYDBActor>;
using TBase::Become;
using TBase::Request;
using TBase::SelfId;
@@ -106,30 +104,33 @@ public:
using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>;
using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>;
using TErrorMessageFactoryMethod =
- std::function<TString(const EStatus status, const NYql::TIssues& issues)>;
+ std::function<TString(EStatus status, const NYql::TIssues& issues)>;
TSchemaQueryYDBActor(const TActorId& proxyActorId,
const TEventRequestPtr request,
TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters,
+ const TRequestCommonCountersPtr& counters,
TQueryFactoryMethod queryFactoryMethod,
TErrorMessageFactoryMethod errorMessageFactoryMethod)
- : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
+ : TBaseActor<TSchemaQueryYDBActor>(
proxyActorId, std::move(request), requestTimeout, counters)
, Tasks{TSchemaQueryTask{.SQL = queryFactoryMethod(Request)}}
- , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
+ , CompletionStatuses(Tasks.size(), ETaskCompletionStatus::NONE)
+ , ErrorMessageFactoryMethod(std::move(errorMessageFactoryMethod))
+ , DBPath(Request->Get()->ComputeDatabase->connection().database()) { }
TSchemaQueryYDBActor(const TActorId& proxyActorId,
const TEventRequestPtr request,
TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters,
+ const TRequestCommonCountersPtr& counters,
TTasksFactoryMethod tasksFactoryMethod,
TErrorMessageFactoryMethod errorMessageFactoryMethod)
- : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
+ : TBaseActor<TSchemaQueryYDBActor>(
proxyActorId, std::move(request), requestTimeout, counters)
- , Tasks(tasksFactoryMethod(Request))
+ , Tasks{tasksFactoryMethod(Request)}
, CompletionStatuses(Tasks.size(), ETaskCompletionStatus::NONE)
- , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
+ , ErrorMessageFactoryMethod(std::move(errorMessageFactoryMethod))
+ , DBPath(Request->Get()->ComputeDatabase->connection().database()) { }
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR";
@@ -271,7 +272,7 @@ public:
RecoveryHandleExecutionResponse);)
void ScheduleNextTask() {
- TBase::Send(SelfId(), new typename TEvPrivate::TEvProcessNextTaskRequest{});
+ TBase::Send(SelfId(), new TEvPrivate::TEvProcessNextTaskRequest{});
}
void TransitionToRollbackState() {
@@ -311,8 +312,7 @@ public:
void SaveIssues(const TString& message, const TStatus& status) {
auto issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, message);
- auto path = Request->Get()->ComputeDatabase->connection().database();
- for (const auto& subIssue : RemoveDatabaseFromIssues(status.GetIssues(), path)) {
+ for (const auto& subIssue : RemoveDatabaseFromIssues(status.GetIssues(), DBPath)) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
}
@@ -327,11 +327,11 @@ public:
FirstStatus.Clear();
}
- static NYdb::TStatus ExtractStatus(const TAsyncStatus& future) {
+ static TStatus ExtractStatus(const TAsyncStatus& future) {
try {
return std::move(future.GetValueSync()); // can throw an exception
} catch (...) {
- return NYdb::TStatus{EStatus::BAD_REQUEST, NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}};
+ return TStatus{EStatus::BAD_REQUEST, NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}};
}
}
@@ -343,10 +343,10 @@ public:
->RetryOperation([query = schemeQuery](TSession session) {
return session.ExecuteSchemeQuery(query);
})
- .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(),
+ .Subscribe([actorSystem = TActivationContext::ActorSystem(),
self = SelfId()](const TAsyncStatus& future) {
actorSystem->Send(self,
- new typename TEvPrivate::TEvQueryExecutionResponse{
+ new TEvPrivate::TEvQueryExecutionResponse{
ExtractStatus(future),
});
});
@@ -371,47 +371,50 @@ private:
i32 CurrentTaskIndex = 0;
TMaybe<EStatus> FirstStatus;
NYql::TIssues Issues;
+ TString DBPath;
};
class TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor :
public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor> {
public:
- using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>;
+ using TBase = TPlainBaseActor;
TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor(
- NActors::TActorId sender,
- const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request,
- TPermissions permissions,
- TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters)
- : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>(
- sender, sender, requestTimeout, counters)
- , Request(request)
- , Permissions(std::move(permissions)) { }
-
- void BootstrapImpl() { CheckConnectionExistenceInCPS(); }
-
- IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) {
+ const TActorId sender,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const TString& cloudId,
+ const TMaybe<TQuotaMap>& quotas,
+ const TTenantInfo::TPtr& tenantInfo,
+ const TString& connectionName,
+ const TPermissions& permissions,
+ const TDuration& requestTimeout,
+ const TRequestCommonCountersPtr& counters)
+ : TPlainBaseActor(sender, sender, requestTimeout, counters)
+ , Scope(scope)
+ , User(user)
+ , Token(token)
+ , CloudId(cloudId)
+ , Quotas(quotas)
+ , TenantInfo(tenantInfo)
+ , ConnectionName(connectionName)
+ , Permissions(permissions) { }
+
+ void BootstrapImpl() override { CheckConnectionExistenceInCPS(); }
+
+ IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) override {
return new TEvPrivate::TEvRecoveryResponse(
Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}});
};
void CheckConnectionExistenceInCPS() {
FederatedQuery::ListConnectionsRequest result;
- auto connectionName = Request->Get()->Request.content().name();
- result.mutable_filter()->set_name(connectionName);
+ result.mutable_filter()->set_name(ConnectionName);
result.set_limit(2);
- auto event =
- new TEvControlPlaneStorage::TEvListConnectionsRequest(Request->Get()->Scope,
- result,
- Request->Get()->User,
- Request->Get()->Token,
- Request->Get()->CloudId,
- Permissions,
- Request->Get()->Quotas,
- Request->Get()->TenantInfo,
- {});
+ auto event = new TEvControlPlaneStorage::TEvListConnectionsRequest(
+ Scope, result, User, Token, CloudId, Permissions, Quotas, TenantInfo, {});
event->IsExactNameMatch = true;
@@ -431,55 +434,62 @@ public:
return;
}
TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse(
- MakeDeleteExternalDataSourceQuery(Request->Get()->Request.content().name()),
+ MakeDeleteExternalDataSourceQuery(ConnectionName),
TStatus{EStatus::SUCCESS, {}}));
}
private:
- NActors::TActorId Sender;
- const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& Request;
- TPermissions Permissions;
+ const TString Scope;
+ const TString User;
+ const TString Token;
+ const TString CloudId;
+ const TMaybe<TQuotaMap> Quotas;
+ const TTenantInfo::TPtr TenantInfo;
+ const TString ConnectionName;
+ const TPermissions Permissions;
};
class TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor :
public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor> {
public:
- using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>;
+ using TBase = TPlainBaseActor;
TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor(
- NActors::TActorId sender,
- const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request,
- TPermissions permissions,
- TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters)
- : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>(
- sender, sender, requestTimeout, counters)
- , Request(request)
- , Permissions(std::move(permissions)) { }
-
- void BootstrapImpl() { CheckBindingExistenceInCPS(); }
-
- IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) {
+ const TActorId& sender,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const TString& cloudId,
+ const TMaybe<TQuotaMap>& quotas,
+ const TTenantInfo::TPtr& tenantInfo,
+ const TString& bindingName,
+ const TPermissions& permissions,
+ const TDuration& requestTimeout,
+ const TRequestCommonCountersPtr& counters)
+ : TPlainBaseActor(sender, sender, requestTimeout, counters)
+ , Scope(scope)
+ , User(user)
+ , Token(token)
+ , CloudId(cloudId)
+ , Quotas(quotas)
+ , TenantInfo(tenantInfo)
+ , BindingName(bindingName)
+ , Permissions(permissions) { }
+
+ void BootstrapImpl() override { CheckBindingExistenceInCPS(); }
+
+ IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) override {
return new TEvPrivate::TEvRecoveryResponse(
Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}});
- };
+ }
void CheckBindingExistenceInCPS() {
FederatedQuery::ListBindingsRequest result;
- auto bindingName = Request->Get()->Request.content().name();
- result.mutable_filter()->set_name(bindingName);
+ result.mutable_filter()->set_name(BindingName);
result.set_limit(2);
- auto event =
- new TEvControlPlaneStorage::TEvListBindingsRequest(Request->Get()->Scope,
- result,
- Request->Get()->User,
- Request->Get()->Token,
- Request->Get()->CloudId,
- Permissions,
- Request->Get()->Quotas,
- Request->Get()->TenantInfo,
- {});
+ auto event = new TEvControlPlaneStorage::TEvListBindingsRequest(
+ Scope, result, User, Token, CloudId, Permissions, Quotas, TenantInfo, {});
event->IsExactNameMatch = true;
@@ -499,14 +509,19 @@ public:
return;
}
TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse(
- MakeDeleteExternalDataTableQuery(Request->Get()->Request.content().name()),
+ MakeDeleteExternalDataTableQuery(BindingName),
TStatus{EStatus::SUCCESS, {}}));
}
private:
- NActors::TActorId Sender;
- const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& Request;
- TPermissions Permissions;
+ const TString Scope;
+ const TString User;
+ const TString Token;
+ const TString CloudId;
+ const TMaybe<TQuotaMap> Quotas;
+ const TTenantInfo::TPtr TenantInfo;
+ const TString BindingName;
+ const TPermissions Permissions;
};
bool IsPathDoesNotExistIssue(const TStatus& status) {
@@ -518,7 +533,7 @@ bool IsPathExistsIssue(const TStatus& status) {
return status.GetIssues().ToOneLineString().Contains("error: path exist");
}
-}
+} // namespace
/// Connection actors
IActor* MakeCreateConnectionActor(
@@ -539,35 +554,48 @@ IActor* MakeCreateConnectionActor(
permissions,
withoutRollback,
commonConfig,
- computeConfig](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
+ computeConfig](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& req)
-> std::vector<TSchemaQueryTask> {
- auto& connectionContent = request->Get()->Request.content();
+ auto& connectionContent = req->Get()->Request.content();
- auto createSecretStatement =
- CreateSecretObjectQuery(connectionContent.setting(),
- connectionContent.name(),
- signer);
+ auto createSecretStatement = CreateSecretObjectQuery(connectionContent.setting(),
+ connectionContent.name(),
+ signer);
std::vector<TSchemaQueryTask> statements;
if (createSecretStatement) {
- statements.push_back(
- TSchemaQueryTask{.SQL = *createSecretStatement,
- .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : NoSkipOnError()});
+ statements.push_back(TSchemaQueryTask{.SQL = *createSecretStatement,
+ .ShouldSkipStepOnError =
+ withoutRollback ? IsPathExistsIssue
+ : NoSkipOnError()});
}
TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
- [&request, requestTimeout, &counters, permissions](TActorId sender,
- const TStatus& status) {
- if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
+ [scope = req->Get()->Scope,
+ user = req->Get()->User,
+ token = req->Get()->Token,
+ cloudId = req->Get()->CloudId,
+ quotas = req->Get()->Quotas,
+ tenantInfo = req->Get()->TenantInfo,
+ connectionName = req->Get()->Request.content().name(),
+ requestTimeout,
+ &counters,
+ permissions](TActorId sender, const TStatus& status) {
+ if (status.GetStatus() == EStatus::ALREADY_EXISTS ||
status.GetIssues().ToOneLineString().Contains("error: path exist")) {
TActivationContext::ActorSystem()->Register(
new TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor(
sender,
- request,
+ scope,
+ user,
+ token,
+ cloudId,
+ quotas,
+ tenantInfo,
+ connectionName,
permissions,
requestTimeout,
- counters.GetCommonCounters(
- RTC_CREATE_CONNECTION_IN_YDB))); // change counter
+ counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB)));
return true;
}
return false;
@@ -576,28 +604,31 @@ IActor* MakeCreateConnectionActor(
.SQL = MakeCreateExternalDataSourceQuery(
connectionContent, signer, commonConfig,
computeConfig.IsReplaceIfExistsSyntaxSupported()),
- .ScheduleErrorRecoverySQLGeneration =
- withoutRollback ? NoRecoverySQLGeneration()
- : alreadyExistRecoveryActorFactoryMethod,
+ .ScheduleErrorRecoverySQLGeneration =
+ withoutRollback
+ ? NoRecoverySQLGeneration()
+ : std::move(alreadyExistRecoveryActorFactoryMethod),
.ShouldSkipStepOnError =
withoutRollback ? IsPathExistsIssue : NoSkipOnError()});
return statements;
};
- auto& connectionName = request->Get()->Request.content().name();
- auto errorMessageFactoryMethod = [connectionId, connectionName](const EStatus status,
- const NYql::TIssues& issues) -> TString {
+ auto errorMessageFactoryMethod =
+ [connectionId, connectionName = request->Get()->Request.content().name()](
+ const EStatus status, const NYql::TIssues& issues) -> TString {
Y_UNUSED(issues);
- TStringBuilder message = TStringBuilder {} << "Synchronization of connection";
+ TStringBuilder message = TStringBuilder{} << "Synchronization of connection";
if (connectionId.Defined()) {
message << " with id '" << connectionId << "'";
}
if (status == NYdb::EStatus::ALREADY_EXISTS) {
- message << " failed, because external data source with name '" << connectionName << "' already exists";
+ message << " failed, because external data source with name '"
+ << connectionName << "' already exists";
} else {
- message << " failed, because creation of external data source with name '" << connectionName << "' wasn't successful";
+ message << " failed, because creation of external data source with name '"
+ << connectionName << "' wasn't successful";
}
- return message;
+ return TString{message};
};
return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest,
@@ -610,12 +641,12 @@ IActor* MakeCreateConnectionActor(
errorMessageFactoryMethod);
}
-NActors::IActor* MakeModifyConnectionActor(
+IActor* MakeModifyConnectionActor(
const TActorId& proxyActorId,
TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- const NConfig::TCommonConfig& commonConfig,
+ const TCommonConfig& commonConfig,
const NFq::TComputeConfig& computeConfig,
TSigner::TPtr signer) {
auto queryFactoryMethod =
@@ -693,9 +724,9 @@ NActors::IActor* MakeModifyConnectionActor(
.ShouldSkipStepOnError = IsPathDoesNotExistIssue});
}
if (createNewSecret) {
- statements.push_back(
- TSchemaQueryTask{.SQL = *createNewSecret,
- .RollbackSQL = DropSecretObjectQuery(newConnectionContent.name())});
+ statements.push_back(TSchemaQueryTask{.SQL = *createNewSecret,
+ .RollbackSQL = DropSecretObjectQuery(
+ newConnectionContent.name())});
}
statements.push_back(
@@ -743,12 +774,12 @@ NActors::IActor* MakeModifyConnectionActor(
errorMessageFactoryMethod);
}
-NActors::IActor* MakeDeleteConnectionActor(
+IActor* MakeDeleteConnectionActor(
const TActorId& proxyActorId,
TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- const NConfig::TCommonConfig& commonConfig,
+ const TCommonConfig& commonConfig,
TSigner::TPtr signer) {
auto queryFactoryMethod =
[signer = std::move(signer),
@@ -760,7 +791,7 @@ NActors::IActor* MakeDeleteConnectionActor(
auto dropSecret =
DropSecretObjectQuery(connectionContent.name());
- std::vector<TSchemaQueryTask> statements = {
+ std::vector statements = {
TSchemaQueryTask{.SQL = TString{MakeDeleteExternalDataSourceQuery(
connectionContent.name())},
.RollbackSQL = MakeCreateExternalDataSourceQuery(
@@ -796,36 +827,47 @@ NActors::IActor* MakeDeleteConnectionActor(
}
/// Bindings actors
-NActors::IActor* MakeCreateBindingActor(
- const TActorId& proxyActorId,
- TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
- TDuration requestTimeout,
- TCounters& counters,
- TPermissions permissions,
- const NFq::TComputeConfig& computeConfig,
- bool withoutRollback,
- TMaybe<TString> bindingId) {
+IActor* MakeCreateBindingActor(const TActorId& proxyActorId,
+ TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ TPermissions permissions,
+ const NFq::TComputeConfig& computeConfig,bool withoutRollback,
+ TMaybe<TString> bindingId) {
auto queryFactoryMethod =
- [requestTimeout,
- &counters, permissions, withoutRollback, computeConfig](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request)
+ [requestTimeout, &counters, permissions, withoutRollback, computeConfig](
+ const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& req)
-> std::vector<TSchemaQueryTask> {
- auto& bindingContent = request->Get()->Request.content();
- auto& externalSourceName = request->Get()->ConnectionContent->name();
+ auto& bindingContent = req->Get()->Request.content();
+ auto& externalSourceName = req->Get()->ConnectionContent->name();
std::vector<TSchemaQueryTask> statements;
TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
- [&request, requestTimeout, &counters, permissions](NActors::TActorId sender,
- const TStatus& status) {
+ [scope = req->Get()->Scope,
+ user = req->Get()->User,
+ token = req->Get()->Token,
+ cloudId = req->Get()->CloudId,
+ quotas = req->Get()->Quotas,
+ tenantInfo = req->Get()->TenantInfo,
+ bindingName = req->Get()->Request.content().name(),
+ requestTimeout,
+ &counters,
+ permissions](TActorId sender, const TStatus& status) {
if (status.GetStatus() == EStatus::ALREADY_EXISTS ||
status.GetIssues().ToOneLineString().Contains("error: path exist")) {
TActivationContext::ActorSystem()->Register(
new TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor(
sender,
- request,
+ scope,
+ user,
+ token,
+ cloudId,
+ quotas,
+ tenantInfo,
+ bindingName,
permissions,
requestTimeout,
- counters.GetCommonCounters(
- RTC_CREATE_BINDING_IN_YDB))); // change counter
+ counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB)));
return true;
}
return false;
@@ -836,27 +878,28 @@ NActors::IActor* MakeCreateBindingActor(
computeConfig.IsReplaceIfExistsSyntaxSupported())},
.ScheduleErrorRecoverySQLGeneration =
withoutRollback ? NoRecoverySQLGeneration()
- : alreadyExistRecoveryActorFactoryMethod,
+ : std::move(alreadyExistRecoveryActorFactoryMethod),
.ShouldSkipStepOnError =
withoutRollback ? IsPathExistsIssue : NoSkipOnError()});
return statements;
};
- auto content = request->Get()->Request.content();
- auto bindingName = content.name();
- auto errorMessageFactoryMethod = [bindingId, bindingName](const EStatus status,
- const NYql::TIssues& issues) -> TString {
+ auto errorMessageFactoryMethod =
+ [bindingId, bindingName = request->Get()->Request.content().name()](
+ const EStatus status, const NYql::TIssues& issues) -> TString {
Y_UNUSED(issues);
- TStringBuilder message = TStringBuilder {} << "Synchronization of binding";
+ TStringBuilder message = TStringBuilder{} << "Synchronization of binding";
if (bindingId.Defined()) {
message << " with id '" << bindingId << "'";
}
- if (status == NYdb::EStatus::ALREADY_EXISTS) {
- message << " failed, because external data table with name '" << bindingName << "' already exists";
+ if (status == EStatus::ALREADY_EXISTS) {
+ message << " failed, because external data table with name '" << bindingName
+ << "' already exists";
} else {
- message << " failed, because creation of external data table with name '" << bindingName << "' wasn't successful";
+ message << " failed, because creation of external data table with name '"
+ << bindingName << "' wasn't successful";
}
- return message;
+ return TString{message};
};
return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateBindingRequest,
@@ -869,11 +912,10 @@ NActors::IActor* MakeCreateBindingActor(
errorMessageFactoryMethod);
}
-NActors::IActor* MakeModifyBindingActor(
- const TActorId& proxyActorId,
- TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request,
- TDuration requestTimeout,
- TCounters& counters,
+IActor* MakeModifyBindingActor(const TActorId& proxyActorId,
+ TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
const NFq::TComputeConfig& computeConfig) {
auto queryFactoryMethod =
[computeConfig](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request)
@@ -921,15 +963,15 @@ NActors::IActor* MakeModifyBindingActor(
errorMessageFactoryMethod);
}
-NActors::IActor* MakeDeleteBindingActor(
- const TActorId& proxyActorId,
- TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request,
- TDuration requestTimeout,
- TCounters& counters) {
+IActor* MakeDeleteBindingActor(const TActorId& proxyActorId,
+ TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters) {
auto queryFactoryMethod =
[](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
- return {{.SQL = MakeDeleteExternalDataTableQuery(request->Get()->OldBindingContent->name()),
+ return {{.SQL = MakeDeleteExternalDataTableQuery(
+ request->Get()->OldBindingContent->name()),
.ShouldSkipStepOnError = IsPathDoesNotExistIssue}};
};
@@ -950,5 +992,5 @@ NActors::IActor* MakeDeleteBindingActor(
errorMessageFactoryMethod);
}
-} // namespace NPrivate
-} // namespace NFq
+} // namespace NFq::NPrivate
+