diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-03-22 17:34:50 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-03-22 17:34:50 +0300 |
commit | 88c40e93ac7d4ec019ebde7e91d87d4d4aa51089 (patch) | |
tree | 160ea259b55ac2c3ab32c5ef1e573ae00d555421 | |
parent | 08d6a4c011647144deb0735a743b8daa8edb2610 (diff) | |
download | ydb-88c40e93ac7d4ec019ebde7e91d87d4d4aa51089.tar.gz |
YQ-911 Create schema in actors instead of futures to correctly shutdown
Wait more time in tests
Actors that create table/directory
ref:4f09423ca582baaa3bb668afabceff30182e230c
-rw-r--r-- | CMakeLists.darwin.txt | 4 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 4 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp | 165 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h | 22 | ||||
-rw-r--r-- | ydb/core/yq/libs/events/event_ids.h | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/events/events.h | 9 | ||||
-rw-r--r-- | ydb/core/yq/libs/ydb/CMakeLists.txt | 7 | ||||
-rw-r--r-- | ydb/core/yq/libs/ydb/create_schema.cpp | 294 | ||||
-rw-r--r-- | ydb/core/yq/libs/ydb/create_schema.h | 33 |
9 files changed, 401 insertions, 138 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 46b5d910bb..a3473816b5 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -695,6 +695,8 @@ add_subdirectory(library/cpp/protobuf/interop) add_subdirectory(ydb/core/yq/libs/config) add_subdirectory(ydb/core/yq/libs/control_plane_storage/internal) add_subdirectory(ydb/core/yq/libs/ydb) +add_subdirectory(library/cpp/retry) +add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/library/security) add_subdirectory(ydb/core/yq/libs/db_schema) add_subdirectory(ydb/core/yq/libs/shared_resources) @@ -709,8 +711,6 @@ add_subdirectory(ydb/library/yql/providers/clickhouse/expr_nodes) add_subdirectory(ydb/library/yql/providers/clickhouse/proto) add_subdirectory(ydb/library/yql/providers/common/dq) add_subdirectory(ydb/library/yql/providers/common/http_gateway) -add_subdirectory(library/cpp/retry) -add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/library/yql/providers/common/comp_nodes) add_subdirectory(ydb/library/yql/providers/dq/provider/exec) add_subdirectory(ydb/library/yql/providers/pq/cm_client/interface) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 230e8be22f..c9b847f350 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -775,6 +775,8 @@ add_subdirectory(library/cpp/protobuf/interop) add_subdirectory(ydb/core/yq/libs/config) add_subdirectory(ydb/core/yq/libs/control_plane_storage/internal) add_subdirectory(ydb/core/yq/libs/ydb) +add_subdirectory(library/cpp/retry) +add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/library/security) add_subdirectory(ydb/core/yq/libs/db_schema) add_subdirectory(ydb/core/yq/libs/shared_resources) @@ -789,8 +791,6 @@ add_subdirectory(ydb/library/yql/providers/clickhouse/expr_nodes) add_subdirectory(ydb/library/yql/providers/clickhouse/proto) add_subdirectory(ydb/library/yql/providers/common/dq) add_subdirectory(ydb/library/yql/providers/common/http_gateway) -add_subdirectory(library/cpp/retry) -add_subdirectory(library/cpp/retry/protos) add_subdirectory(ydb/library/yql/providers/common/comp_nodes) add_subdirectory(ydb/library/yql/providers/dq/provider/exec) add_subdirectory(ydb/library/yql/providers/pq/cm_client/interface) diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index f889e231c8..9ec0900952 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -2,6 +2,8 @@ #include "validators.h" #include "ydb_control_plane_storage_impl.h" +#include <ydb/core/yq/libs/ydb/create_schema.h> + #include <ydb/library/security/ydb_credentials_provider_factory.h> namespace NYq { @@ -31,6 +33,14 @@ inline YandexQuery::BindingSetting::BindingCase GetBindingType(const TString& ty return static_cast<YandexQuery::BindingSetting::BindingCase>(type); } +ERetryErrorClass RetryFunc(const NYdb::TStatus& status) { + return status.GetStatus() == NYdb::EStatus::OVERLOADED ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry; +} + +TYdbSdkRetryPolicy::TPtr MakeCreateSchemaRetryPolicy() { + return TYdbSdkRetryPolicy::GetExponentialBackoffPolicy(RetryFunc, TDuration::MilliSeconds(10), TDuration::Seconds(1), TDuration::Seconds(5)); +} + } // namespace void TYdbControlPlaneStorageActor::Bootstrap() { @@ -39,16 +49,15 @@ void TYdbControlPlaneStorageActor::Bootstrap() { DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10); YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory); - auto as = NActors::TActivationContext::ActorSystem(); - CreateDirectory(as); - CreateQueriesTable(as); - CreatePendingSmallTable(as); - CreateConnectionsTable(as); - CreateBindingsTable(as); - CreateIdempotencyKeysTable(as); - CreateResultSetsTable(as); - CreateJobsTable(as); - CreateNodesTable(as); + CreateDirectory(); + CreateQueriesTable(); + CreatePendingSmallTable(); + CreateConnectionsTable(); + CreateBindingsTable(); + CreateIdempotencyKeysTable(); + CreateResultSetsTable(); + CreateJobsTable(); + CreateNodesTable(); Become(&TThis::StateFunc); } @@ -74,7 +83,11 @@ TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStora /* * Creating tables */ -TAsyncStatus TYdbControlPlaneStorageActor::CreateQueriesTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc) { + Register(MakeCreateTableActor({}, NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, path, std::move(desc), MakeCreateSchemaRetryPolicy())); +} + +void TYdbControlPlaneStorageActor::CreateQueriesTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, QUERIES_TABLE_NAME); @@ -102,21 +115,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateQueriesTable(TActorSystem* as) .SetTtlSettings(EXPIRE_AT_COLUMN_NAME) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create quries table error: " << status.GetIssues().ToString()); - return CreateQueriesTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } -TAsyncStatus TYdbControlPlaneStorageActor::CreatePendingSmallTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreatePendingSmallTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, PENDING_SMALL_TABLE_NAME); @@ -134,21 +136,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreatePendingSmallTable(TActorSystem* .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, SCOPE_COLUMN_NAME, QUERY_ID_COLUMN_NAME}) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create pending table error: " << status.GetIssues().ToString()); - return CreatePendingSmallTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } -TAsyncStatus TYdbControlPlaneStorageActor::CreateConnectionsTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreateConnectionsTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, CONNECTIONS_TABLE_NAME); @@ -165,34 +156,15 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateConnectionsTable(TActorSystem* .SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, CONNECTION_ID_COLUMN_NAME}) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create connections table error: " << status.GetIssues().ToString()); - return CreateConnectionsTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } -TAsyncStatus TYdbControlPlaneStorageActor::CreateDirectory(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreateDirectory() { - auto schemeClient = NYdb::NScheme::TSchemeClient(YdbConnection->Driver); - return schemeClient.MakeDirectory(YdbConnection->TablePathPrefix).Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create directory error: " << status.GetIssues().ToString()); - return CreateDirectory(as); - } - return future; - }); + Register(MakeCreateDirectoryActor({}, NKikimrServices::YQ_CONTROL_PLANE_STORAGE, YdbConnection, YdbConnection->TablePathPrefix, MakeCreateSchemaRetryPolicy())); } -TAsyncStatus TYdbControlPlaneStorageActor::CreateJobsTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreateJobsTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, JOBS_TABLE_NAME); @@ -208,21 +180,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateJobsTable(TActorSystem* as) .SetTtlSettings(EXPIRE_AT_COLUMN_NAME) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create jobs table error: " << status.GetIssues().ToString()); - return CreateJobsTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } -TAsyncStatus TYdbControlPlaneStorageActor::CreateNodesTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreateNodesTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, NODES_TABLE_NAME); @@ -241,21 +202,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateNodesTable(TActorSystem* as) .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME}) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create nodes table error: " << status.GetIssues().ToString()); - return CreateNodesTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } -TAsyncStatus TYdbControlPlaneStorageActor::CreateBindingsTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreateBindingsTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, BINDINGS_TABLE_NAME); @@ -272,21 +222,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateBindingsTable(TActorSystem* as) .SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, BINDING_ID_COLUMN_NAME}) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create bindings table error: " << status.GetIssues().ToString()); - return CreateBindingsTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } -TAsyncStatus TYdbControlPlaneStorageActor::CreateIdempotencyKeysTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreateIdempotencyKeysTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, IDEMPOTENCY_KEYS_TABLE_NAME); @@ -300,21 +239,10 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateIdempotencyKeysTable(TActorSyst .SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, IDEMPOTENCY_KEY_COLUMN_NAME}) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create idempotency keys table error: " << status.GetIssues().ToString()); - return CreateIdempotencyKeysTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } -TAsyncStatus TYdbControlPlaneStorageActor::CreateResultSetsTable(TActorSystem* as) +void TYdbControlPlaneStorageActor::CreateResultSetsTable() { auto tablePath = JoinPath(YdbConnection->TablePathPrefix, RESULT_SETS_TABLE_NAME); @@ -328,18 +256,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateResultSetsTable(TActorSystem* a .SetPrimaryKeyColumns({RESULT_ID_COLUMN_NAME, RESULT_SET_ID_COLUMN_NAME, ROW_ID_COLUMN_NAME}) .Build(); - return YdbConnection->Client.RetryOperation( - [tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable { - return session.CreateTable(tablePath, TTableDescription(description)); - }) - .Apply([=](const auto& future) { - auto status = future.GetValue(); - if (!IsTableCreated(status)) { - CPS_LOG_AS_E(*as, "create result sets table error: " << status.GetIssues().ToString()); - return CreateResultSetsTable(as); - } - return future; - }); + RunCreateTableActor(tablePath, TTableDescription(description)); } bool TYdbControlPlaneStorageActor::IsSuperUser(const TString& user) diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index b9029cd567..d7157ee93c 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -512,16 +512,18 @@ public: /* * Creating tables */ - TAsyncStatus CreateDirectory(TActorSystem* as); - TAsyncStatus CreateQueriesTable(TActorSystem* as); - TAsyncStatus CreatePendingTable(TActorSystem* as); - TAsyncStatus CreatePendingSmallTable(TActorSystem* as); - TAsyncStatus CreateConnectionsTable(TActorSystem* as); - TAsyncStatus CreateJobsTable(TActorSystem* as); - TAsyncStatus CreateNodesTable(TActorSystem* as); - TAsyncStatus CreateBindingsTable(TActorSystem* as); - TAsyncStatus CreateIdempotencyKeysTable(TActorSystem* as); - TAsyncStatus CreateResultSetsTable(TActorSystem* as); + void CreateDirectory(); + void CreateQueriesTable(); + void CreatePendingTable(); + void CreatePendingSmallTable(); + void CreateConnectionsTable(); + void CreateJobsTable(); + void CreateNodesTable(); + void CreateBindingsTable(); + void CreateIdempotencyKeysTable(); + void CreateResultSetsTable(); + + void RunCreateTableActor(const TString& path, NYdb::NTable::TTableDescription desc); private: /* diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h index b947b9d244..40e97d762b 100644 --- a/ydb/core/yq/libs/events/event_ids.h +++ b/ydb/core/yq/libs/events/event_ids.h @@ -43,6 +43,7 @@ struct TEventIds { EvForwardPingResponse, EvGraphParams, EvRaiseTransientIssues, + EvSchemaCreated, // Special events EvEnd diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index 0ba106b33a..7f2a88528c 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -218,6 +218,15 @@ struct TEvents { NYql::TIssues TransientIssues; }; + + struct TEvSchemaCreated : public NActors::TEventLocal<TEvSchemaCreated, TEventIds::EvSchemaCreated> { + explicit TEvSchemaCreated(NYdb::TStatus result) + : Result(std::move(result)) + { + } + + NYdb::TStatus Result; + }; }; } // namespace NYq diff --git a/ydb/core/yq/libs/ydb/CMakeLists.txt b/ydb/core/yq/libs/ydb/CMakeLists.txt index ff782b6044..3e9c460c51 100644 --- a/ydb/core/yq/libs/ydb/CMakeLists.txt +++ b/ydb/core/yq/libs/ydb/CMakeLists.txt @@ -8,16 +8,23 @@ add_library(yq-libs-ydb) +target_compile_options(yq-libs-ydb PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(yq-libs-ydb PUBLIC contrib-libs-cxxsupp yutil + cpp-actors-core + library-cpp-retry yq-libs-config + yq-libs-events ydb-library-security cpp-client-ydb_scheme cpp-client-ydb_table tools-enum_parser-enum_serialization_runtime ) target_sources(yq-libs-ydb PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/ydb/create_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/ydb/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/ydb/ydb.cpp ) diff --git a/ydb/core/yq/libs/ydb/create_schema.cpp b/ydb/core/yq/libs/ydb/create_schema.cpp new file mode 100644 index 0000000000..51af951a06 --- /dev/null +++ b/ydb/core/yq/libs/ydb/create_schema.cpp @@ -0,0 +1,294 @@ +#include "create_schema.h" + +#include <ydb/core/yq/libs/events/events.h> +#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/log.h> + +#include <util/string/builder.h> +#include <util/system/defaults.h> + +#define LOG_IMPL(level, logRecordStream) \ + LOG_LOG_S(::NActors::TActivationContext::AsActorContext(), ::NActors::NLog:: Y_CAT(PRI_, level), LogComponent, logRecordStream); + +#define SCHEMA_LOG_DEBUG(logRecordStream) LOG_IMPL(DEBUG, logRecordStream) +#define SCHEMA_LOG_INFO(logRecordStream) LOG_IMPL(INFO, logRecordStream) +#define SCHEMA_LOG_WARN(logRecordStream) LOG_IMPL(WARN, logRecordStream) +#define SCHEMA_LOG_ERROR(logRecordStream) LOG_IMPL(ERROR, logRecordStream) + +namespace NYq { + +namespace { + +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + EvCreateSessionResult = EvBegin, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + // Events + struct TEvCreateSessionResult : public NActors::TEventLocal<TEvCreateSessionResult, EvCreateSessionResult> { + explicit TEvCreateSessionResult( + NYdb::NTable::TCreateSessionResult result) + : Result(std::move(result)) + { + } + + NYdb::NTable::TCreateSessionResult Result; + }; +}; + +} // namespace + +class TCreateActorBase : public NActors::TActorBootstrapped<TCreateActorBase> { +public: + TCreateActorBase( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) + : Parent(parent) + , LogComponent(logComponent) + , Connection(std::move(connection)) + , RetryPolicy(std::move(retryPolicy)) + , Cookie(cookie) + { + } + + void Bootstrap() { + Become(&TCreateActorBase::StateFunc); + SCHEMA_LOG_DEBUG("Run create " << GetEntityName() << " actor"); + CallCreate(); + } + +protected: + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvSchemaCreated, Handle) + hFunc(TEvPrivate::TEvCreateSessionResult, Handle) + hFunc(NActors::TEvents::TEvWakeup, Handle) + ); + + virtual TString GetEntityName() const = 0; + + void Handle(TEvents::TEvSchemaCreated::TPtr& ev) { + if (IsTableCreated(ev->Get()->Result)) { + SCHEMA_LOG_DEBUG("Successfully created " << GetEntityName()); + ReplyAndDie(ev); + return; + } + + SCHEMA_LOG_ERROR("Create " << GetEntityName() << " error: " << ev->Get()->Result.GetIssues().ToOneLineString()); + + if (!ScheduleNextAttempt(ev)) { + ReplyAndDie(ev); + } + } + + void Handle(NActors::TEvents::TEvWakeup::TPtr&) { + CallCreate(); + } + + virtual void Handle(TEvPrivate::TEvCreateSessionResult::TPtr&) { + } + + template <class TEventPtr> + [[nodiscard]] bool ScheduleNextAttempt(TEventPtr& ev) { + if (!RetryState) { + RetryState = RetryPolicy->CreateRetryState(); + if (!RetryState) { + return false; + } + } + + auto delayMaybe = RetryState->GetNextRetryDelay(ev->Get()->Result); + if (!delayMaybe) { + return false; + } + + Schedule(*delayMaybe, new NActors::TEvents::TEvWakeup()); + return true; + } + + virtual void CallCreate() { + CallYdbSdk().Subscribe( + [actorId = SelfId(), actorSystem = NActors::TActivationContext::ActorSystem()](const NYdb::TAsyncStatus& result) { + actorSystem->Send(actorId, new TEvents::TEvSchemaCreated(result.GetValue())); + } + ); + } + + virtual NYdb::TAsyncStatus CallYdbSdk() = 0; + + template <class TEventPtr> + void ReplyAndDie(TEventPtr& ev) { + SCHEMA_LOG_DEBUG("Create " << GetEntityName() << ". Reply: " << ev->Get()->Result.GetIssues().ToOneLineString()); + if (Parent) { + Send(Parent, new TEvents::TEvSchemaCreated(ev->Get()->Result), Cookie); + } + PassAway(); + } + +protected: + const NActors::TActorId Parent; + const ui64 LogComponent; + const TYdbConnectionPtr Connection; + const TYdbSdkRetryPolicy::TPtr RetryPolicy; + const ui64 Cookie; + TYdbSdkRetryPolicy::IRetryState::TPtr RetryState; + TMaybe<NYdb::NTable::TSession> Session; +}; + +class TCreateActorBaseWithSession : public TCreateActorBase { +public: + using TCreateActorBase::TCreateActorBase; + +protected: + void Handle(TEvPrivate::TEvCreateSessionResult::TPtr& ev) override { + if (ev->Get()->Result.IsSuccess()) { + SCHEMA_LOG_DEBUG("Create " << GetEntityName() << ". Create session OK"); + Session = ev->Get()->Result.GetSession(); + CallCreate(); + } else { + SCHEMA_LOG_WARN("Create " << GetEntityName() << ". Create session error: " << ev->Get()->Result.GetIssues().ToOneLineString()); + if (!ScheduleNextAttempt(ev)) { + ReplyAndDie(ev); + } + } + } + + void CreateSession() { + Connection->Client.GetSession().Subscribe( + [actorId = SelfId(), actorSystem = NActors::TActivationContext::ActorSystem()](const NYdb::NTable::TAsyncCreateSessionResult& result) { + actorSystem->Send(actorId, new TEvPrivate::TEvCreateSessionResult(result.GetValue())); + } + ); + } + + void CallCreate() override { + if (!Session) { + CreateSession(); + return; + } + + TCreateActorBase::CallCreate(); + } + + void PassAway() override { + TCreateActorBase::PassAway(); + if (Session) { + Session->Close(); + } + } + +protected: + TMaybe<NYdb::NTable::TSession> Session; +}; + +class TCreateTableActor : public TCreateActorBaseWithSession { +public: + TCreateTableActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& tablePath, + const NYdb::NTable::TTableDescription& tableDesc, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) + : TCreateActorBaseWithSession(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie) + , TablePath(tablePath) + , TableDesc(tableDesc) + { + } + +private: + TString GetEntityName() const override { + return TStringBuilder() << "table \"" << TablePath << "\""; + } + + NYdb::TAsyncStatus CallYdbSdk() override { + SCHEMA_LOG_DEBUG("Call create table \"" << TablePath << "\""); + return Session->CreateTable(TablePath, NYdb::NTable::TTableDescription(TableDesc)); + } + +private: + const TString TablePath; + const NYdb::NTable::TTableDescription TableDesc; +}; + +class TCreateDirectoryActor : public TCreateActorBase { +public: + TCreateDirectoryActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& directoryPath, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) + : TCreateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie) + , DirectoryPath(directoryPath) + , SchemeClient(Connection->Driver) + { + } + +private: + TString GetEntityName() const override { + return TStringBuilder() << "directory \"" << DirectoryPath << "\""; + } + + NYdb::TAsyncStatus CallYdbSdk() override { + SCHEMA_LOG_DEBUG("Call create directory \"" << DirectoryPath << "\""); + return SchemeClient.MakeDirectory(DirectoryPath); + } + +private: + const TString DirectoryPath; + NYdb::NScheme::TSchemeClient SchemeClient; +}; + +NActors::IActor* MakeCreateTableActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& tablePath, + const NYdb::NTable::TTableDescription& tableDesc, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) +{ + return new TCreateTableActor( + parent, + logComponent, + std::move(connection), + tablePath, + tableDesc, + std::move(retryPolicy), + cookie + ); +} + +NActors::IActor* MakeCreateDirectoryActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& directoryPath, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) +{ + return new TCreateDirectoryActor( + parent, + logComponent, + std::move(connection), + directoryPath, + std::move(retryPolicy), + cookie + ); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/ydb/create_schema.h b/ydb/core/yq/libs/ydb/create_schema.h new file mode 100644 index 0000000000..b05c0ec01b --- /dev/null +++ b/ydb/core/yq/libs/ydb/create_schema.h @@ -0,0 +1,33 @@ +#pragma once +#include <ydb/core/yq/libs/ydb/ydb.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/retry/retry_policy.h> + +namespace NYq { + +using TYdbSdkRetryPolicy = IRetryPolicy<const NYdb::TStatus&>; + +// Actor that creates table. +// Send TEvSchemaCreated to parent (if any). +NActors::IActor* MakeCreateTableActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& tablePath, + const NYdb::NTable::TTableDescription& tableDesc, + TYdbSdkRetryPolicy::TPtr, + ui64 cookie = 0); + +// Actor that creates directory. +// Send TEvSchemaCreated to parent (if any). +NActors::IActor* MakeCreateDirectoryActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& directoryPath, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie = 0); + +} // namespace NYq |