aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-03-22 17:34:50 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-03-22 17:34:50 +0300
commit88c40e93ac7d4ec019ebde7e91d87d4d4aa51089 (patch)
tree160ea259b55ac2c3ab32c5ef1e573ae00d555421
parent08d6a4c011647144deb0735a743b8daa8edb2610 (diff)
downloadydb-88c40e93ac7d4ec019ebde7e91d87d4d4aa51089.tar.gz
YQ-911 Create schema in actors instead of futures to correctly shutdown
Wait more time in tests Actors that create table/directory ref:4f09423ca582baaa3bb668afabceff30182e230c
-rw-r--r--CMakeLists.darwin.txt4
-rw-r--r--CMakeLists.linux.txt4
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp165
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h22
-rw-r--r--ydb/core/yq/libs/events/event_ids.h1
-rw-r--r--ydb/core/yq/libs/events/events.h9
-rw-r--r--ydb/core/yq/libs/ydb/CMakeLists.txt7
-rw-r--r--ydb/core/yq/libs/ydb/create_schema.cpp294
-rw-r--r--ydb/core/yq/libs/ydb/create_schema.h33
9 files changed, 401 insertions, 138 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 46b5d910bb..a3473816b5 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -695,6 +695,8 @@ add_subdirectory(library/cpp/protobuf/interop)
add_subdirectory(ydb/core/yq/libs/config)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/internal)
add_subdirectory(ydb/core/yq/libs/ydb)
+add_subdirectory(library/cpp/retry)
+add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/library/security)
add_subdirectory(ydb/core/yq/libs/db_schema)
add_subdirectory(ydb/core/yq/libs/shared_resources)
@@ -709,8 +711,6 @@ add_subdirectory(ydb/library/yql/providers/clickhouse/expr_nodes)
add_subdirectory(ydb/library/yql/providers/clickhouse/proto)
add_subdirectory(ydb/library/yql/providers/common/dq)
add_subdirectory(ydb/library/yql/providers/common/http_gateway)
-add_subdirectory(library/cpp/retry)
-add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/library/yql/providers/common/comp_nodes)
add_subdirectory(ydb/library/yql/providers/dq/provider/exec)
add_subdirectory(ydb/library/yql/providers/pq/cm_client/interface)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 230e8be22f..c9b847f350 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -775,6 +775,8 @@ add_subdirectory(library/cpp/protobuf/interop)
add_subdirectory(ydb/core/yq/libs/config)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/internal)
add_subdirectory(ydb/core/yq/libs/ydb)
+add_subdirectory(library/cpp/retry)
+add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/library/security)
add_subdirectory(ydb/core/yq/libs/db_schema)
add_subdirectory(ydb/core/yq/libs/shared_resources)
@@ -789,8 +791,6 @@ add_subdirectory(ydb/library/yql/providers/clickhouse/expr_nodes)
add_subdirectory(ydb/library/yql/providers/clickhouse/proto)
add_subdirectory(ydb/library/yql/providers/common/dq)
add_subdirectory(ydb/library/yql/providers/common/http_gateway)
-add_subdirectory(library/cpp/retry)
-add_subdirectory(library/cpp/retry/protos)
add_subdirectory(ydb/library/yql/providers/common/comp_nodes)
add_subdirectory(ydb/library/yql/providers/dq/provider/exec)
add_subdirectory(ydb/library/yql/providers/pq/cm_client/interface)
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index f889e231c8..9ec0900952 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -2,6 +2,8 @@
#include "validators.h"
#include "ydb_control_plane_storage_impl.h"
+#include <ydb/core/yq/libs/ydb/create_schema.h>
+
#include <ydb/library/security/ydb_credentials_provider_factory.h>
namespace NYq {
@@ -31,6 +33,14 @@ inline YandexQuery::BindingSetting::BindingCase GetBindingType(const TString& ty
return static_cast<YandexQuery::BindingSetting::BindingCase>(type);
}
+ERetryErrorClass RetryFunc(const NYdb::TStatus& status) {
+ return status.GetStatus() == NYdb::EStatus::OVERLOADED ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;
+}
+
+TYdbSdkRetryPolicy::TPtr MakeCreateSchemaRetryPolicy() {
+ return TYdbSdkRetryPolicy::GetExponentialBackoffPolicy(RetryFunc, TDuration::MilliSeconds(10), TDuration::Seconds(1), TDuration::Seconds(5));
+}
+
} // namespace
void TYdbControlPlaneStorageActor::Bootstrap() {
@@ -39,16 +49,15 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10);
YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory);
- auto as = NActors::TActivationContext::ActorSystem();
- CreateDirectory(as);
- CreateQueriesTable(as);
- CreatePendingSmallTable(as);
- CreateConnectionsTable(as);
- CreateBindingsTable(as);
- CreateIdempotencyKeysTable(as);
- CreateResultSetsTable(as);
- CreateJobsTable(as);
- CreateNodesTable(as);
+ CreateDirectory();
+ CreateQueriesTable();
+ CreatePendingSmallTable();
+ CreateConnectionsTable();
+ CreateBindingsTable();
+ CreateIdempotencyKeysTable();
+ CreateResultSetsTable();
+ CreateJobsTable();
+ CreateNodesTable();
Become(&TThis::StateFunc);
}
@@ -74,7 +83,11 @@ TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStora
/*
* Creating tables
*/
-TAsyncStatus TYdbControlPlaneStorageActor::CreateQueriesTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc) {
+ Register(MakeCreateTableActor({}, NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, path, std::move(desc), MakeCreateSchemaRetryPolicy()));
+}
+
+void TYdbControlPlaneStorageActor::CreateQueriesTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, QUERIES_TABLE_NAME);
@@ -102,21 +115,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateQueriesTable(TActorSystem* as)
.SetTtlSettings(EXPIRE_AT_COLUMN_NAME)
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create quries table error: " << status.GetIssues().ToString());
- return CreateQueriesTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreatePendingSmallTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreatePendingSmallTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, PENDING_SMALL_TABLE_NAME);
@@ -134,21 +136,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreatePendingSmallTable(TActorSystem*
.SetPrimaryKeyColumns({TENANT_COLUMN_NAME, SCOPE_COLUMN_NAME, QUERY_ID_COLUMN_NAME})
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create pending table error: " << status.GetIssues().ToString());
- return CreatePendingSmallTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreateConnectionsTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreateConnectionsTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, CONNECTIONS_TABLE_NAME);
@@ -165,34 +156,15 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateConnectionsTable(TActorSystem*
.SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, CONNECTION_ID_COLUMN_NAME})
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create connections table error: " << status.GetIssues().ToString());
- return CreateConnectionsTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreateDirectory(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreateDirectory()
{
- auto schemeClient = NYdb::NScheme::TSchemeClient(YdbConnection->Driver);
- return schemeClient.MakeDirectory(YdbConnection->TablePathPrefix).Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create directory error: " << status.GetIssues().ToString());
- return CreateDirectory(as);
- }
- return future;
- });
+ Register(MakeCreateDirectoryActor({}, NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, YdbConnection->TablePathPrefix, MakeCreateSchemaRetryPolicy()));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreateJobsTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreateJobsTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, JOBS_TABLE_NAME);
@@ -208,21 +180,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateJobsTable(TActorSystem* as)
.SetTtlSettings(EXPIRE_AT_COLUMN_NAME)
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create jobs table error: " << status.GetIssues().ToString());
- return CreateJobsTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreateNodesTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreateNodesTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, NODES_TABLE_NAME);
@@ -241,21 +202,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateNodesTable(TActorSystem* as)
.SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME})
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create nodes table error: " << status.GetIssues().ToString());
- return CreateNodesTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreateBindingsTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreateBindingsTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, BINDINGS_TABLE_NAME);
@@ -272,21 +222,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateBindingsTable(TActorSystem* as)
.SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, BINDING_ID_COLUMN_NAME})
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create bindings table error: " << status.GetIssues().ToString());
- return CreateBindingsTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreateIdempotencyKeysTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreateIdempotencyKeysTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, IDEMPOTENCY_KEYS_TABLE_NAME);
@@ -300,21 +239,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateIdempotencyKeysTable(TActorSyst
.SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, IDEMPOTENCY_KEY_COLUMN_NAME})
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create idempotency keys table error: " << status.GetIssues().ToString());
- return CreateIdempotencyKeysTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
-TAsyncStatus TYdbControlPlaneStorageActor::CreateResultSetsTable(TActorSystem* as)
+void TYdbControlPlaneStorageActor::CreateResultSetsTable()
{
auto tablePath = JoinPath(YdbConnection->TablePathPrefix, RESULT_SETS_TABLE_NAME);
@@ -328,18 +256,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateResultSetsTable(TActorSystem* a
.SetPrimaryKeyColumns({RESULT_ID_COLUMN_NAME, RESULT_SET_ID_COLUMN_NAME, ROW_ID_COLUMN_NAME})
.Build();
- return YdbConnection->Client.RetryOperation(
- [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
- return session.CreateTable(tablePath, TTableDescription(description));
- })
- .Apply([=](const auto& future) {
- auto status = future.GetValue();
- if (!IsTableCreated(status)) {
- CPS_LOG_AS_E(*as, "create result sets table error: " << status.GetIssues().ToString());
- return CreateResultSetsTable(as);
- }
- return future;
- });
+ RunCreateTableActor(tablePath, TTableDescription(description));
}
bool TYdbControlPlaneStorageActor::IsSuperUser(const TString& user)
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index b9029cd567..d7157ee93c 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -512,16 +512,18 @@ public:
/*
* Creating tables
*/
- TAsyncStatus CreateDirectory(TActorSystem* as);
- TAsyncStatus CreateQueriesTable(TActorSystem* as);
- TAsyncStatus CreatePendingTable(TActorSystem* as);
- TAsyncStatus CreatePendingSmallTable(TActorSystem* as);
- TAsyncStatus CreateConnectionsTable(TActorSystem* as);
- TAsyncStatus CreateJobsTable(TActorSystem* as);
- TAsyncStatus CreateNodesTable(TActorSystem* as);
- TAsyncStatus CreateBindingsTable(TActorSystem* as);
- TAsyncStatus CreateIdempotencyKeysTable(TActorSystem* as);
- TAsyncStatus CreateResultSetsTable(TActorSystem* as);
+ void CreateDirectory();
+ void CreateQueriesTable();
+ void CreatePendingTable();
+ void CreatePendingSmallTable();
+ void CreateConnectionsTable();
+ void CreateJobsTable();
+ void CreateNodesTable();
+ void CreateBindingsTable();
+ void CreateIdempotencyKeysTable();
+ void CreateResultSetsTable();
+
+ void RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc);
private:
/*
diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h
index b947b9d244..40e97d762b 100644
--- a/ydb/core/yq/libs/events/event_ids.h
+++ b/ydb/core/yq/libs/events/event_ids.h
@@ -43,6 +43,7 @@ struct TEventIds {
EvForwardPingResponse,
EvGraphParams,
EvRaiseTransientIssues,
+ EvSchemaCreated,
// Special events
EvEnd
diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h
index 0ba106b33a..7f2a88528c 100644
--- a/ydb/core/yq/libs/events/events.h
+++ b/ydb/core/yq/libs/events/events.h
@@ -218,6 +218,15 @@ struct TEvents {
NYql::TIssues TransientIssues;
};
+
+ struct TEvSchemaCreated : public NActors::TEventLocal<TEvSchemaCreated, TEventIds::EvSchemaCreated> {
+ explicit TEvSchemaCreated(NYdb::TStatus result)
+ : Result(std::move(result))
+ {
+ }
+
+ NYdb::TStatus Result;
+ };
};
} // namespace NYq
diff --git a/ydb/core/yq/libs/ydb/CMakeLists.txt b/ydb/core/yq/libs/ydb/CMakeLists.txt
index ff782b6044..3e9c460c51 100644
--- a/ydb/core/yq/libs/ydb/CMakeLists.txt
+++ b/ydb/core/yq/libs/ydb/CMakeLists.txt
@@ -8,16 +8,23 @@
add_library(yq-libs-ydb)
+target_compile_options(yq-libs-ydb PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(yq-libs-ydb PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-actors-core
+ library-cpp-retry
yq-libs-config
+ yq-libs-events
ydb-library-security
cpp-client-ydb_scheme
cpp-client-ydb_table
tools-enum_parser-enum_serialization_runtime
)
target_sources(yq-libs-ydb PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/ydb/create_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/ydb/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/ydb/ydb.cpp
)
diff --git a/ydb/core/yq/libs/ydb/create_schema.cpp b/ydb/core/yq/libs/ydb/create_schema.cpp
new file mode 100644
index 0000000000..51af951a06
--- /dev/null
+++ b/ydb/core/yq/libs/ydb/create_schema.cpp
@@ -0,0 +1,294 @@
+#include "create_schema.h"
+
+#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <util/string/builder.h>
+#include <util/system/defaults.h>
+
+#define LOG_IMPL(level, logRecordStream) \
+ LOG_LOG_S(::NActors::TActivationContext::AsActorContext(), ::NActors::NLog:: Y_CAT(PRI_, level), LogComponent, logRecordStream);
+
+#define SCHEMA_LOG_DEBUG(logRecordStream) LOG_IMPL(DEBUG, logRecordStream)
+#define SCHEMA_LOG_INFO(logRecordStream) LOG_IMPL(INFO, logRecordStream)
+#define SCHEMA_LOG_WARN(logRecordStream) LOG_IMPL(WARN, logRecordStream)
+#define SCHEMA_LOG_ERROR(logRecordStream) LOG_IMPL(ERROR, logRecordStream)
+
+namespace NYq {
+
+namespace {
+
+struct TEvPrivate {
+ // Event ids
+ enum EEv : ui32 {
+ EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+
+ EvCreateSessionResult = EvBegin,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+
+ // Events
+ struct TEvCreateSessionResult : public NActors::TEventLocal<TEvCreateSessionResult, EvCreateSessionResult> {
+ explicit TEvCreateSessionResult(
+ NYdb::NTable::TCreateSessionResult result)
+ : Result(std::move(result))
+ {
+ }
+
+ NYdb::NTable::TCreateSessionResult Result;
+ };
+};
+
+} // namespace
+
+class TCreateActorBase : public NActors::TActorBootstrapped<TCreateActorBase> {
+public:
+ TCreateActorBase(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+ : Parent(parent)
+ , LogComponent(logComponent)
+ , Connection(std::move(connection))
+ , RetryPolicy(std::move(retryPolicy))
+ , Cookie(cookie)
+ {
+ }
+
+ void Bootstrap() {
+ Become(&TCreateActorBase::StateFunc);
+ SCHEMA_LOG_DEBUG("Run create " << GetEntityName() << " actor");
+ CallCreate();
+ }
+
+protected:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvSchemaCreated, Handle)
+ hFunc(TEvPrivate::TEvCreateSessionResult, Handle)
+ hFunc(NActors::TEvents::TEvWakeup, Handle)
+ );
+
+ virtual TString GetEntityName() const = 0;
+
+ void Handle(TEvents::TEvSchemaCreated::TPtr& ev) {
+ if (IsTableCreated(ev->Get()->Result)) {
+ SCHEMA_LOG_DEBUG("Successfully created " << GetEntityName());
+ ReplyAndDie(ev);
+ return;
+ }
+
+ SCHEMA_LOG_ERROR("Create " << GetEntityName() << " error: " << ev->Get()->Result.GetIssues().ToOneLineString());
+
+ if (!ScheduleNextAttempt(ev)) {
+ ReplyAndDie(ev);
+ }
+ }
+
+ void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
+ CallCreate();
+ }
+
+ virtual void Handle(TEvPrivate::TEvCreateSessionResult::TPtr&) {
+ }
+
+ template <class TEventPtr>
+ [[nodiscard]] bool ScheduleNextAttempt(TEventPtr& ev) {
+ if (!RetryState) {
+ RetryState = RetryPolicy->CreateRetryState();
+ if (!RetryState) {
+ return false;
+ }
+ }
+
+ auto delayMaybe = RetryState->GetNextRetryDelay(ev->Get()->Result);
+ if (!delayMaybe) {
+ return false;
+ }
+
+ Schedule(*delayMaybe, new NActors::TEvents::TEvWakeup());
+ return true;
+ }
+
+ virtual void CallCreate() {
+ CallYdbSdk().Subscribe(
+ [actorId = SelfId(), actorSystem = NActors::TActivationContext::ActorSystem()](const NYdb::TAsyncStatus& result) {
+ actorSystem->Send(actorId, new TEvents::TEvSchemaCreated(result.GetValue()));
+ }
+ );
+ }
+
+ virtual NYdb::TAsyncStatus CallYdbSdk() = 0;
+
+ template <class TEventPtr>
+ void ReplyAndDie(TEventPtr& ev) {
+ SCHEMA_LOG_DEBUG("Create " << GetEntityName() << ". Reply: " << ev->Get()->Result.GetIssues().ToOneLineString());
+ if (Parent) {
+ Send(Parent, new TEvents::TEvSchemaCreated(ev->Get()->Result), Cookie);
+ }
+ PassAway();
+ }
+
+protected:
+ const NActors::TActorId Parent;
+ const ui64 LogComponent;
+ const TYdbConnectionPtr Connection;
+ const TYdbSdkRetryPolicy::TPtr RetryPolicy;
+ const ui64 Cookie;
+ TYdbSdkRetryPolicy::IRetryState::TPtr RetryState;
+ TMaybe<NYdb::NTable::TSession> Session;
+};
+
+class TCreateActorBaseWithSession : public TCreateActorBase {
+public:
+ using TCreateActorBase::TCreateActorBase;
+
+protected:
+ void Handle(TEvPrivate::TEvCreateSessionResult::TPtr& ev) override {
+ if (ev->Get()->Result.IsSuccess()) {
+ SCHEMA_LOG_DEBUG("Create " << GetEntityName() << ". Create session OK");
+ Session = ev->Get()->Result.GetSession();
+ CallCreate();
+ } else {
+ SCHEMA_LOG_WARN("Create " << GetEntityName() << ". Create session error: " << ev->Get()->Result.GetIssues().ToOneLineString());
+ if (!ScheduleNextAttempt(ev)) {
+ ReplyAndDie(ev);
+ }
+ }
+ }
+
+ void CreateSession() {
+ Connection->Client.GetSession().Subscribe(
+ [actorId = SelfId(), actorSystem = NActors::TActivationContext::ActorSystem()](const NYdb::NTable::TAsyncCreateSessionResult& result) {
+ actorSystem->Send(actorId, new TEvPrivate::TEvCreateSessionResult(result.GetValue()));
+ }
+ );
+ }
+
+ void CallCreate() override {
+ if (!Session) {
+ CreateSession();
+ return;
+ }
+
+ TCreateActorBase::CallCreate();
+ }
+
+ void PassAway() override {
+ TCreateActorBase::PassAway();
+ if (Session) {
+ Session->Close();
+ }
+ }
+
+protected:
+ TMaybe<NYdb::NTable::TSession> Session;
+};
+
+class TCreateTableActor : public TCreateActorBaseWithSession {
+public:
+ TCreateTableActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& tablePath,
+ const NYdb::NTable::TTableDescription& tableDesc,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+ : TCreateActorBaseWithSession(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie)
+ , TablePath(tablePath)
+ , TableDesc(tableDesc)
+ {
+ }
+
+private:
+ TString GetEntityName() const override {
+ return TStringBuilder() << "table \"" << TablePath << "\"";
+ }
+
+ NYdb::TAsyncStatus CallYdbSdk() override {
+ SCHEMA_LOG_DEBUG("Call create table \"" << TablePath << "\"");
+ return Session->CreateTable(TablePath, NYdb::NTable::TTableDescription(TableDesc));
+ }
+
+private:
+ const TString TablePath;
+ const NYdb::NTable::TTableDescription TableDesc;
+};
+
+class TCreateDirectoryActor : public TCreateActorBase {
+public:
+ TCreateDirectoryActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& directoryPath,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+ : TCreateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie)
+ , DirectoryPath(directoryPath)
+ , SchemeClient(Connection->Driver)
+ {
+ }
+
+private:
+ TString GetEntityName() const override {
+ return TStringBuilder() << "directory \"" << DirectoryPath << "\"";
+ }
+
+ NYdb::TAsyncStatus CallYdbSdk() override {
+ SCHEMA_LOG_DEBUG("Call create directory \"" << DirectoryPath << "\"");
+ return SchemeClient.MakeDirectory(DirectoryPath);
+ }
+
+private:
+ const TString DirectoryPath;
+ NYdb::NScheme::TSchemeClient SchemeClient;
+};
+
+NActors::IActor* MakeCreateTableActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& tablePath,
+ const NYdb::NTable::TTableDescription& tableDesc,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+{
+ return new TCreateTableActor(
+ parent,
+ logComponent,
+ std::move(connection),
+ tablePath,
+ tableDesc,
+ std::move(retryPolicy),
+ cookie
+ );
+}
+
+NActors::IActor* MakeCreateDirectoryActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& directoryPath,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+{
+ return new TCreateDirectoryActor(
+ parent,
+ logComponent,
+ std::move(connection),
+ directoryPath,
+ std::move(retryPolicy),
+ cookie
+ );
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/ydb/create_schema.h b/ydb/core/yq/libs/ydb/create_schema.h
new file mode 100644
index 0000000000..b05c0ec01b
--- /dev/null
+++ b/ydb/core/yq/libs/ydb/create_schema.h
@@ -0,0 +1,33 @@
+#pragma once
+#include <ydb/core/yq/libs/ydb/ydb.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/retry/retry_policy.h>
+
+namespace NYq {
+
+using TYdbSdkRetryPolicy = IRetryPolicy<const NYdb::TStatus&>;
+
+// Actor that creates table.
+// Send TEvSchemaCreated to parent (if any).
+NActors::IActor* MakeCreateTableActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& tablePath,
+ const NYdb::NTable::TTableDescription& tableDesc,
+ TYdbSdkRetryPolicy::TPtr,
+ ui64 cookie = 0);
+
+// Actor that creates directory.
+// Send TEvSchemaCreated to parent (if any).
+NActors::IActor* MakeCreateDirectoryActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& directoryPath,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie = 0);
+
+} // namespace NYq