diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-02-22 12:40:47 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-02-22 12:40:47 +0300 |
commit | f94243367a2275dcaaea1c3e692fe1ed6c484a71 (patch) | |
tree | 869db7afb9252945c9576b1d474f0dcc1c27a743 | |
parent | f5c229ce02c5a7ce2c1b1682f7794cdc2d0a236a (diff) | |
download | ydb-f94243367a2275dcaaea1c3e692fe1ed6c484a71.tar.gz |
Nodes discovery, replication service's mock
30 files changed, 571 insertions, 13 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin.txt b/ydb/core/driver_lib/run/CMakeLists.darwin.txt index 91bdf6787a4..411c78bad7d 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin.txt @@ -95,6 +95,7 @@ target_link_libraries(run PUBLIC tx-long_tx_service-public core-tx-mediator tx-replication-controller + tx-replication-service core-tx-scheme_board core-tx-schemeshard core-tx-sequenceproxy diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index 49d1f9a476a..0f62710fa19 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -96,6 +96,7 @@ target_link_libraries(run PUBLIC tx-long_tx_service-public core-tx-mediator tx-replication-controller + tx-replication-service core-tx-scheme_board core-tx-schemeshard core-tx-sequenceproxy diff --git a/ydb/core/driver_lib/run/CMakeLists.linux.txt b/ydb/core/driver_lib/run/CMakeLists.linux.txt index 49d1f9a476a..0f62710fa19 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux.txt @@ -96,6 +96,7 @@ target_link_libraries(run PUBLIC tx-long_tx_service-public core-tx-mediator tx-replication-controller + tx-replication-service core-tx-scheme_board core-tx-schemeshard core-tx-sequenceproxy diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index c6886b3208c..30a079074eb 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -66,7 +66,8 @@ union TBasicKikimrServicesMask { bool EnableYandexQuery:1; bool EnableSequenceProxyService:1; bool EnableHttpProxy:1; - bool EnableMetadataProvider : 1; + bool EnableMetadataProvider:1; + bool EnableReplicationService:1; }; ui64 Raw; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index d0733d5bd03..cd3ec6c70f7 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -123,6 +123,7 @@ #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/mediator/mediator.h> #include <ydb/core/tx/replication/controller/controller.h> +#include <ydb/core/tx/replication/service/service.h> #include <ydb/core/tx/scheme_board/scheme_board.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/sequenceproxy/sequenceproxy.h> @@ -2454,5 +2455,17 @@ void TFederatedQueryInitializer::InitializeServices(TActorSystemSetup* setup, co ); } +TReplicationServiceInitializer::TReplicationServiceInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) +{ +} + +void TReplicationServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + setup->LocalServices.emplace_back( + NReplication::MakeReplicationServiceId(NodeId), + TActorSetupCmd(NReplication::CreateReplicationService(), TMailboxType::HTSwap, appData->UserPoolId) + ); +} + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 14855b48900..a14c7a7c0fb 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -531,5 +531,12 @@ private: static ui32 IcPort; }; +class TReplicationServiceInitializer : public IKikimrServicesInitializer { +public: + TReplicationServiceInitializer(const TKikimrRunConfig& runConfig); + + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index e07aa51b80b..9427b911ec6 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1503,6 +1503,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TSequenceProxyServiceInitializer(runConfig)); } + if (serviceMask.EnableReplicationService) { + sil->AddServiceInitializer(new TReplicationServiceInitializer(runConfig)); + } + return sil; } diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index b2e2f52c489..b0b1760425c 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -970,7 +970,9 @@ message TActivity { SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603; REPLICATION_CONTROLLER_STREAM_REMOVER = 604; REPLICATION_CONTROLLER_DST_REMOVER = 605; + REPLICATION_CONTROLLER_TENANT_RESOLVER = 606; DISCOVERY_ACTOR = 607; DISCOVERY_CACHE_ACTOR = 608; + REPLICATION_SERVICE = 609; }; }; diff --git a/ydb/core/tx/replication/CMakeLists.txt b/ydb/core/tx/replication/CMakeLists.txt index fe827df6158..e8a77d31b80 100644 --- a/ydb/core/tx/replication/CMakeLists.txt +++ b/ydb/core/tx/replication/CMakeLists.txt @@ -7,4 +7,5 @@ add_subdirectory(controller) +add_subdirectory(service) add_subdirectory(ydb_proxy) diff --git a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt index 15f00b6b4b2..13d52ecab98 100644 --- a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt +++ b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt @@ -15,6 +15,7 @@ target_link_libraries(tx-replication-controller PUBLIC contrib-libs-cxxsupp yutil ydb-core-base + ydb-core-discovery core-engine-minikql ydb-core-protos ydb-core-tablet_flat @@ -28,6 +29,7 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/nodes_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp @@ -37,11 +39,12 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_discoverer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_with_stream.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tenant_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt index a47d27c9265..071d97a5432 100644 --- a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt @@ -16,6 +16,7 @@ target_link_libraries(tx-replication-controller PUBLIC contrib-libs-cxxsupp yutil ydb-core-base + ydb-core-discovery core-engine-minikql ydb-core-protos ydb-core-tablet_flat @@ -29,6 +30,7 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/nodes_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp @@ -38,11 +40,12 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_discoverer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_with_stream.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tenant_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux.txt b/ydb/core/tx/replication/controller/CMakeLists.linux.txt index a47d27c9265..071d97a5432 100644 --- a/ydb/core/tx/replication/controller/CMakeLists.linux.txt +++ b/ydb/core/tx/replication/controller/CMakeLists.linux.txt @@ -16,6 +16,7 @@ target_link_libraries(tx-replication-controller PUBLIC contrib-libs-cxxsupp yutil ydb-core-base + ydb-core-discovery core-engine-minikql ydb-core-protos ydb-core-tablet_flat @@ -29,6 +30,7 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/nodes_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp @@ -38,11 +40,12 @@ target_sources(tx-replication-controller PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_discoverer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_with_stream.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tenant_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 09d0906cc19..b34afd8b861 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -1,6 +1,7 @@ #include "controller.h" #include "controller_impl.h" +#include <ydb/core/discovery/discovery.h> #include <ydb/core/engine/minikql/flat_local_tx_factory.h> namespace NKikimr::NReplication { @@ -56,6 +57,10 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvDropStreamResult, Handle); HFunc(TEvPrivate::TEvCreateDstResult, Handle); HFunc(TEvPrivate::TEvDropDstResult, Handle); + HFunc(TEvPrivate::TEvResolveTenantResult, Handle); + HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); + HFunc(TEvStateStorage::TEvBoardInfo, Handle); + HFunc(TEvDiscovery::TEvError, Handle); HFunc(TEvents::TEvPoison, Handle); } } @@ -66,6 +71,10 @@ void TController::SwitchToWork(const TActorContext& ctx) { SignalTabletActive(ctx); Become(&TThis::StateWork); + if (!DiscoveryCache) { + DiscoveryCache = ctx.Register(CreateDiscoveryCache()); + } + for (auto& [_, replication] : Replications) { replication->Progress(ctx); } @@ -122,6 +131,60 @@ void TController::Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorCon RunTxDropDstResult(ev, ctx); } +void TController::Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + const auto rid = ev->Get()->ReplicationId; + const auto& tenant = ev->Get()->Tenant; + + auto replication = Find(rid); + if (!replication) { + CLOG_W(ctx, "Unknown replication" + << ": rid# " << rid); + return; + } + + if (ev->Get()->IsSuccess()) { + CLOG_N(ctx, "Tenant resolved" + << ": rid# " << rid + << ", tenant# " << tenant); + + if (!NodesManager.HasTenant(tenant)) { + CLOG_I(ctx, "Discover tenant nodes" + << ": tenant# " << tenant); + NodesManager.DiscoverNodes(tenant, DiscoveryCache, ctx); + } + } else { + CLOG_E(ctx, "Resolve tenant error" + << ": rid# " << rid); + Y_VERIFY(!tenant); + } + + replication->SetTenant(tenant); + replication->Progress(ctx); +} + +void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + const auto& tenant = ev->Get()->Tenant; + if (NodesManager.HasTenant(tenant)) { + CLOG_I(ctx, "Discover tenant nodes" + << ": tenant# " << tenant); + NodesManager.DiscoverNodes(tenant, DiscoveryCache, ctx); + } +} + +void TController::Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + NodesManager.ProcessResponse(ev, ctx); +} + +void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + NodesManager.ProcessResponse(ev, ctx); +} + void TController::Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); @@ -129,6 +192,12 @@ void TController::Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx) replication->Shutdown(ctx); } + if (auto actorId = std::exchange(DiscoveryCache, {})) { + Send(actorId, new TEvents::TEvPoison()); + } + + NodesManager.Shutdown(ctx); + Send(Tablet(), new TEvents::TEvPoison()); Become(&TThis::StateZombie); } diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index c530ca8a906..1e521293e6e 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -1,6 +1,7 @@ #pragma once #include "logging.h" +#include "nodes_manager.h" #include "private_events.h" #include "public_events.h" #include "replication.h" @@ -69,6 +70,10 @@ private: void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx); // local transactions @@ -123,6 +128,10 @@ private: THashMap<ui64, TReplication::TPtr> Replications; THashMap<TPathId, TReplication::TPtr> ReplicationsByPathId; + // discovery + TActorId DiscoveryCache; + TNodesManager NodesManager; + }; // TController } diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp new file mode 100644 index 00000000000..40877c1815e --- /dev/null +++ b/ydb/core/tx/replication/controller/nodes_manager.cpp @@ -0,0 +1,58 @@ +#include "nodes_manager.h" +#include "private_events.h" + +#include <ydb/core/tx/replication/service/service.h> + +namespace NKikimr::NReplication::NController { + +bool TNodesManager::HasTenant(const TString& tenant) const { + return TenantNodes.contains(tenant); +} + +const THashSet<ui32>& TNodesManager::GetNodes(const TString& tenant) const { + Y_VERIFY(HasTenant(tenant)); + return TenantNodes.at(tenant); +} + +void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx) { + TenantNodes.emplace(tenant, THashSet<ui32>()); + NodeDiscoverers.emplace(ctx.Register(CreateDiscoverer(&NService::MakeDiscoveryPath, tenant, ctx.SelfID, cache)), tenant); +} + +void TNodesManager::ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) { + Y_VERIFY(ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); + + auto it = NodeDiscoverers.find(ev->Sender); + if (it == NodeDiscoverers.end()) { + return; + } + + auto& nodes = TenantNodes[it->second]; + nodes.clear(); + + for (const auto& [actorId, _] : ev->Get()->InfoEntries) { + nodes.insert(actorId.NodeId()); + } + + ctx.Schedule(UpdateInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second)); + NodeDiscoverers.erase(it); +} + +void TNodesManager::ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) { + auto it = NodeDiscoverers.find(ev->Sender); + if (it == NodeDiscoverers.end()) { + return; + } + + ctx.Schedule(RetryInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second)); + NodeDiscoverers.erase(it); +} + +void TNodesManager::Shutdown(const TActorContext& ctx) { + for (const auto& [actorId, _] : std::exchange(NodeDiscoverers, {})) { + ctx.Send(actorId, new TEvents::TEvPoison()); + } +} + +} + diff --git a/ydb/core/tx/replication/controller/nodes_manager.h b/ydb/core/tx/replication/controller/nodes_manager.h new file mode 100644 index 00000000000..d07c0de826d --- /dev/null +++ b/ydb/core/tx/replication/controller/nodes_manager.h @@ -0,0 +1,30 @@ +#pragma once + +#include <ydb/core/base/statestorage.h> +#include <ydb/core/discovery/discovery.h> + +#include <util/generic/hash.h> +#include <util/generic/hash_set.h> + +namespace NKikimr::NReplication::NController { + +class TNodesManager { + static constexpr TDuration UpdateInternal = TDuration::Minutes(5); + static constexpr TDuration RetryInternal = TDuration::Seconds(10); + +public: + bool HasTenant(const TString& tenant) const; + const THashSet<ui32>& GetNodes(const TString& tenant) const; + + void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx); + void ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx); + void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); + + void Shutdown(const TActorContext& ctx); + +private: + THashMap<TString, THashSet<ui32>> TenantNodes; + THashMap<TActorId, TString> NodeDiscoverers; +}; + +} diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp index ee5e24c722b..a3313312ef8 100644 --- a/ydb/core/tx/replication/controller/private_events.cpp +++ b/ydb/core/tx/replication/controller/private_events.cpp @@ -145,6 +145,42 @@ TString TEvPrivate::TEvDropReplication::ToString() const { << " }"; } +TEvPrivate::TEvResolveTenantResult::TEvResolveTenantResult(ui64 rid, const TString& tenant) + : ReplicationId(rid) + , Tenant(tenant) + , Success(true) +{ +} + +TEvPrivate::TEvResolveTenantResult::TEvResolveTenantResult(ui64 rid, bool success) + : ReplicationId(rid) + , Success(success) +{ +} + +TString TEvPrivate::TEvResolveTenantResult::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " ReplicationId: " << ReplicationId + << " Tenant: " << Tenant + << " Sucess: " << Success + << " }"; +} + +bool TEvPrivate::TEvResolveTenantResult::IsSuccess() const { + return Success; +} + +TEvPrivate::TEvUpdateTenantNodes::TEvUpdateTenantNodes(const TString& tenant) + : Tenant(tenant) +{ +} + +TString TEvPrivate::TEvUpdateTenantNodes::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Tenant: " << Tenant + << " }"; +} + } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) { diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index 94b474206c7..70484679305 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -18,6 +18,8 @@ struct TEvPrivate { EvDropStreamResult, EvDropDstResult, EvDropReplication, + EvResolveTenantResult, + EvUpdateTenantNodes, EvEnd, }; @@ -104,6 +106,25 @@ struct TEvPrivate { TString ToString() const override; }; + struct TEvResolveTenantResult: public TEventLocal<TEvResolveTenantResult, EvResolveTenantResult> { + const ui64 ReplicationId; + const TString Tenant; + const bool Success; + + explicit TEvResolveTenantResult(ui64 rid, const TString& tenant); + explicit TEvResolveTenantResult(ui64 rid, bool success); + TString ToString() const override; + + bool IsSuccess() const; + }; + + struct TEvUpdateTenantNodes: public TEventLocal<TEvUpdateTenantNodes, EvUpdateTenantNodes> { + const TString Tenant; + + explicit TEvUpdateTenantNodes(const TString& tenant); + TString ToString() const override; + }; + }; // TEvPrivate } diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index 673dd268f25..d1847dd1d10 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -2,6 +2,7 @@ #include "replication.h" #include "target_discoverer.h" #include "target_table.h" +#include "tenant_resolver.h" #include "util.h" #include <ydb/core/protos/replication.pb.h> @@ -37,7 +38,7 @@ class TReplication::TImpl { paths.emplace_back(target.GetSrcPath(), target.GetDstPath()); } - Discoverer = ctx.Register(CreateTargetDiscoverer(ctx.SelfID, ReplicationId, YdbProxy, std::move(paths))); + TargetDiscoverer = ctx.Register(CreateTargetDiscoverer(ctx.SelfID, ReplicationId, YdbProxy, std::move(paths))); break; } @@ -101,6 +102,10 @@ public: } } + if (!Tenant && !TenantResolver) { + TenantResolver = ctx.Register(CreateTenantResolver(ctx.SelfID, ReplicationId, PathId)); + } + switch (State) { case EState::Ready: if (!Targets) { @@ -124,7 +129,7 @@ public: target->Shutdown(ctx); } - for (auto& x : TVector<TActorId>{Discoverer, YdbProxy}) { + for (auto& x : TVector<TActorId>{TargetDiscoverer, TenantResolver, YdbProxy}) { if (auto actorId = std::exchange(x, {})) { ctx.Send(actorId, new TEvents::TEvPoison()); } @@ -143,6 +148,7 @@ public: private: const ui64 ReplicationId; const TPathId PathId; + TString Tenant; NKikimrReplication::TReplicationConfig Config; EState State = EState::Ready; @@ -150,7 +156,8 @@ private: ui64 NextTargetId = 1; THashMap<ui64, THolder<ITarget>> Targets; TActorId YdbProxy; - TActorId Discoverer; + TActorId TenantResolver; + TActorId TargetDiscoverer; }; // TImpl @@ -232,6 +239,15 @@ ui64 TReplication::GetNextTargetId() const { return Impl->NextTargetId; } +void TReplication::SetTenant(const TString& value) { + Impl->Tenant = value; + Impl->TenantResolver = {}; +} + +const TString& TReplication::GetTenant() const { + return Impl->Tenant; +} + void TReplication::SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId) { DropOp = {sender, opId}; } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index db1ab27046e..5ab51fd34d2 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -100,6 +100,9 @@ public: void SetNextTargetId(ui64 value); ui64 GetNextTargetId() const; + void SetTenant(const TString& value); + const TString& GetTenant() const; + void SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId); const std::optional<TDropOp>& GetDropOp() const; diff --git a/ydb/core/tx/replication/controller/tenant_resolver.cpp b/ydb/core/tx/replication/controller/tenant_resolver.cpp new file mode 100644 index 00000000000..1cf4b8221db --- /dev/null +++ b/ydb/core/tx/replication/controller/tenant_resolver.cpp @@ -0,0 +1,103 @@ +#include "logging.h" +#include "private_events.h" +#include "tenant_resolver.h" + +#include <ydb/core/base/path.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NKikimr::NReplication::NController { + +class TTenantResolver: public TActorBootstrapped<TTenantResolver> { + void Resolve(const TPathId& pathId) { + auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + + auto& entry = request->ResultSet.emplace_back(); + entry.TableId = pathId; + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + entry.RedirectRequired = false; + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto* response = ev->Get()->Request.Get(); + + Y_VERIFY(response->ResultSet.size() == 1); + const auto& entry = response->ResultSet.front(); + + LOG_T("Handle " << ev->Get()->ToString() + << ": entry# " << entry.ToString()); + + switch (entry.Status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: + break; + default: + LOG_W("Unexpected status" + << ": entry# " << entry.ToString()); + return Reply(false); + } + + if (!DomainKey) { + if (!entry.DomainInfo) { + LOG_E("Empty domain info" + << ": entry# " << entry.ToString()); + return Reply(false); + } + + DomainKey = entry.DomainInfo->DomainKey; + Resolve(DomainKey); + } else { + return Reply(CanonizePath(entry.Path)); + } + } + + template <typename... Args> + void Reply(Args&&... args) { + Send(Parent, new TEvPrivate::TEvResolveTenantResult(ReplicationId, std::forward<Args>(args)...)); + PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TENANT_RESOLVER; + } + + explicit TTenantResolver(const TActorId& parent, ui64 rid, const TPathId& pathId) + : Parent(parent) + , ReplicationId(rid) + , PathId(pathId) + , LogPrefix("TenantResolver", ReplicationId) + { + } + + void Bootstrap() { + Resolve(PathId); + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Parent; + const ui64 ReplicationId; + const TPathId PathId; + const TActorLogPrefix LogPrefix; + + TPathId DomainKey; + +}; // TTenantResolver + +IActor* CreateTenantResolver(const TActorId& parent, ui64 rid, const TPathId& pathId) { + return new TTenantResolver(parent, rid, pathId); +} + +} diff --git a/ydb/core/tx/replication/controller/tenant_resolver.h b/ydb/core/tx/replication/controller/tenant_resolver.h new file mode 100644 index 00000000000..b5f75161074 --- /dev/null +++ b/ydb/core/tx/replication/controller/tenant_resolver.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/core/base/defs.h> + +namespace NKikimr::NReplication::NController { + +IActor* CreateTenantResolver(const TActorId& parent, ui64 rid, const TPathId& pathId); + +} diff --git a/ydb/core/tx/replication/controller/tx_create_replication.cpp b/ydb/core/tx/replication/controller/tx_create_replication.cpp index b07ea95ce8c..0899c55c5e6 100644 --- a/ydb/core/tx/replication/controller/tx_create_replication.cpp +++ b/ydb/core/tx/replication/controller/tx_create_replication.cpp @@ -21,7 +21,7 @@ public: bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); - const auto& record = Ev->Get()->Record; + auto& record = Ev->Get()->Record; Result = MakeHolder<TEvController::TEvCreateReplicationResult>(); Result->Record.MutableOperationId()->CopyFrom(record.GetOperationId()); Result->Record.SetOrigin(Self->TabletID()); @@ -36,18 +36,18 @@ public: } NIceDb::TNiceDb db(txc.DB); + const auto rid = Self->SysParams.AllocateReplicationId(db); + CLOG_N(ctx, "Add replication" + << ": rid# " << rid + << ", pathId# " << pathId); - Replication = Self->Add(rid, pathId, record.GetConfig()); db.Table<Schema::Replications>().Key(rid).Update( NIceDb::TUpdate<Schema::Replications::PathOwnerId>(pathId.OwnerId), NIceDb::TUpdate<Schema::Replications::PathLocalId>(pathId.LocalPathId), NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString()) ); - - CLOG_N(ctx, "Add replication" - << ": rid# " << rid - << ", pathId# " << pathId); + Replication = Self->Add(rid, pathId, std::move(*record.MutableConfig())); Result->Record.SetStatus(NKikimrReplication::TEvCreateReplicationResult::SUCCESS); return true; diff --git a/ydb/core/tx/replication/controller/tx_discovery_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp index 686de8fb261..686de8fb261 100644 --- a/ydb/core/tx/replication/controller/tx_discovery_result.cpp +++ b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp diff --git a/ydb/core/tx/replication/service/CMakeLists.darwin.txt b/ydb/core/tx/replication/service/CMakeLists.darwin.txt new file mode 100644 index 00000000000..ce070b006e1 --- /dev/null +++ b/ydb/core/tx/replication/service/CMakeLists.darwin.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-replication-service) +target_compile_options(tx-replication-service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(tx-replication-service PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-base +) +target_sources(tx-replication-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/service/service.cpp +) diff --git a/ydb/core/tx/replication/service/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/service/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..68c0eddf13d --- /dev/null +++ b/ydb/core/tx/replication/service/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-replication-service) +target_compile_options(tx-replication-service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(tx-replication-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-base +) +target_sources(tx-replication-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/service/service.cpp +) diff --git a/ydb/core/tx/replication/service/CMakeLists.linux.txt b/ydb/core/tx/replication/service/CMakeLists.linux.txt new file mode 100644 index 00000000000..68c0eddf13d --- /dev/null +++ b/ydb/core/tx/replication/service/CMakeLists.linux.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-replication-service) +target_compile_options(tx-replication-service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(tx-replication-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-base +) +target_sources(tx-replication-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/service/service.cpp +) diff --git a/ydb/core/tx/replication/service/CMakeLists.txt b/ydb/core/tx/replication/service/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/core/tx/replication/service/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp new file mode 100644 index 00000000000..bc9489a71d7 --- /dev/null +++ b/ydb/core/tx/replication/service/service.cpp @@ -0,0 +1,63 @@ +#include "service.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/path.h> +#include <ydb/core/base/statestorage.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NKikimr::NReplication { + +namespace NService { + +class TReplicationService: public TActorBootstrapped<TReplicationService> { + void RunBoardPublisher() { + const auto& tenant = AppData()->TenantName; + + auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(ExtractDomain(tenant)); + if (!domainInfo) { + return PassAway(); + } + + const auto boardPath = MakeDiscoveryPath(tenant); + const auto groupId = domainInfo->DefaultStateStorageGroup; + BoardPublisher = Register(CreateBoardPublishActor(boardPath, TString(), SelfId(), groupId, 0, true)); + } + + void PassAway() override { + if (auto actorId = std::exchange(BoardPublisher, {})) { + Send(actorId, new TEvents::TEvPoison()); + } + + TActorBootstrapped<TReplicationService>::PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_SERVICE; + } + + void Bootstrap() { + Become(&TThis::StateWork); + RunBoardPublisher(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + TActorId BoardPublisher; + +}; // TReplicationService + +} // NService + +IActor* CreateReplicationService() { + return new NService::TReplicationService(); +} + +} diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h new file mode 100644 index 00000000000..fb113e2d61c --- /dev/null +++ b/ydb/core/tx/replication/service/service.h @@ -0,0 +1,21 @@ +#pragma once + +#include <ydb/core/base/defs.h> + +namespace NKikimr::NReplication { + +namespace NService { + +inline TString MakeDiscoveryPath(const TString& tenant) { + return "rs+" + tenant; +} + +} // NService + +inline TActorId MakeReplicationServiceId(ui32 nodeId) { + return TActorId(nodeId, TStringBuf("ReplictnSvc")); +} + +IActor* CreateReplicationService(); + +} |