aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-02-22 12:40:47 +0300
committerilnaz <ilnaz@ydb.tech>2023-02-22 12:40:47 +0300
commitf94243367a2275dcaaea1c3e692fe1ed6c484a71 (patch)
tree869db7afb9252945c9576b1d474f0dcc1c27a743
parentf5c229ce02c5a7ce2c1b1682f7794cdc2d0a236a (diff)
downloadydb-f94243367a2275dcaaea1c3e692fe1ed6c484a71.tar.gz
Nodes discovery, replication service's mock
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux.txt1
-rw-r--r--ydb/core/driver_lib/run/config.h3
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp13
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h7
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/tx/replication/CMakeLists.txt1
-rw-r--r--ydb/core/tx/replication/controller/CMakeLists.darwin.txt5
-rw-r--r--ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt5
-rw-r--r--ydb/core/tx/replication/controller/CMakeLists.linux.txt5
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp69
-rw-r--r--ydb/core/tx/replication/controller/controller_impl.h9
-rw-r--r--ydb/core/tx/replication/controller/nodes_manager.cpp58
-rw-r--r--ydb/core/tx/replication/controller/nodes_manager.h30
-rw-r--r--ydb/core/tx/replication/controller/private_events.cpp36
-rw-r--r--ydb/core/tx/replication/controller/private_events.h21
-rw-r--r--ydb/core/tx/replication/controller/replication.cpp22
-rw-r--r--ydb/core/tx/replication/controller/replication.h3
-rw-r--r--ydb/core/tx/replication/controller/tenant_resolver.cpp103
-rw-r--r--ydb/core/tx/replication/controller/tenant_resolver.h9
-rw-r--r--ydb/core/tx/replication/controller/tx_create_replication.cpp12
-rw-r--r--ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp (renamed from ydb/core/tx/replication/controller/tx_discovery_result.cpp)0
-rw-r--r--ydb/core/tx/replication/service/CMakeLists.darwin.txt21
-rw-r--r--ydb/core/tx/replication/service/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/tx/replication/service/CMakeLists.linux.txt22
-rw-r--r--ydb/core/tx/replication/service/CMakeLists.txt15
-rw-r--r--ydb/core/tx/replication/service/service.cpp63
-rw-r--r--ydb/core/tx/replication/service/service.h21
30 files changed, 571 insertions, 13 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin.txt b/ydb/core/driver_lib/run/CMakeLists.darwin.txt
index 91bdf6787a4..411c78bad7d 100644
--- a/ydb/core/driver_lib/run/CMakeLists.darwin.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.darwin.txt
@@ -95,6 +95,7 @@ target_link_libraries(run PUBLIC
tx-long_tx_service-public
core-tx-mediator
tx-replication-controller
+ tx-replication-service
core-tx-scheme_board
core-tx-schemeshard
core-tx-sequenceproxy
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
index 49d1f9a476a..0f62710fa19 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
@@ -96,6 +96,7 @@ target_link_libraries(run PUBLIC
tx-long_tx_service-public
core-tx-mediator
tx-replication-controller
+ tx-replication-service
core-tx-scheme_board
core-tx-schemeshard
core-tx-sequenceproxy
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux.txt b/ydb/core/driver_lib/run/CMakeLists.linux.txt
index 49d1f9a476a..0f62710fa19 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux.txt
@@ -96,6 +96,7 @@ target_link_libraries(run PUBLIC
tx-long_tx_service-public
core-tx-mediator
tx-replication-controller
+ tx-replication-service
core-tx-scheme_board
core-tx-schemeshard
core-tx-sequenceproxy
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index c6886b3208c..30a079074eb 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -66,7 +66,8 @@ union TBasicKikimrServicesMask {
bool EnableYandexQuery:1;
bool EnableSequenceProxyService:1;
bool EnableHttpProxy:1;
- bool EnableMetadataProvider : 1;
+ bool EnableMetadataProvider:1;
+ bool EnableReplicationService:1;
};
ui64 Raw;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index d0733d5bd03..cd3ec6c70f7 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -123,6 +123,7 @@
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/mediator/mediator.h>
#include <ydb/core/tx/replication/controller/controller.h>
+#include <ydb/core/tx/replication/service/service.h>
#include <ydb/core/tx/scheme_board/scheme_board.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/sequenceproxy/sequenceproxy.h>
@@ -2454,5 +2455,17 @@ void TFederatedQueryInitializer::InitializeServices(TActorSystemSetup* setup, co
);
}
+TReplicationServiceInitializer::TReplicationServiceInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig)
+{
+}
+
+void TReplicationServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ setup->LocalServices.emplace_back(
+ NReplication::MakeReplicationServiceId(NodeId),
+ TActorSetupCmd(NReplication::CreateReplicationService(), TMailboxType::HTSwap, appData->UserPoolId)
+ );
+}
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 14855b48900..a14c7a7c0fb 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -531,5 +531,12 @@ private:
static ui32 IcPort;
};
+class TReplicationServiceInitializer : public IKikimrServicesInitializer {
+public:
+ TReplicationServiceInitializer(const TKikimrRunConfig& runConfig);
+
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index e07aa51b80b..9427b911ec6 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1503,6 +1503,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TSequenceProxyServiceInitializer(runConfig));
}
+ if (serviceMask.EnableReplicationService) {
+ sil->AddServiceInitializer(new TReplicationServiceInitializer(runConfig));
+ }
+
return sil;
}
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index b2e2f52c489..b0b1760425c 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -970,7 +970,9 @@ message TActivity {
SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603;
REPLICATION_CONTROLLER_STREAM_REMOVER = 604;
REPLICATION_CONTROLLER_DST_REMOVER = 605;
+ REPLICATION_CONTROLLER_TENANT_RESOLVER = 606;
DISCOVERY_ACTOR = 607;
DISCOVERY_CACHE_ACTOR = 608;
+ REPLICATION_SERVICE = 609;
};
};
diff --git a/ydb/core/tx/replication/CMakeLists.txt b/ydb/core/tx/replication/CMakeLists.txt
index fe827df6158..e8a77d31b80 100644
--- a/ydb/core/tx/replication/CMakeLists.txt
+++ b/ydb/core/tx/replication/CMakeLists.txt
@@ -7,4 +7,5 @@
add_subdirectory(controller)
+add_subdirectory(service)
add_subdirectory(ydb_proxy)
diff --git a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt
index 15f00b6b4b2..13d52ecab98 100644
--- a/ydb/core/tx/replication/controller/CMakeLists.darwin.txt
+++ b/ydb/core/tx/replication/controller/CMakeLists.darwin.txt
@@ -15,6 +15,7 @@ target_link_libraries(tx-replication-controller PUBLIC
contrib-libs-cxxsupp
yutil
ydb-core-base
+ ydb-core-discovery
core-engine-minikql
ydb-core-protos
ydb-core-tablet_flat
@@ -28,6 +29,7 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/nodes_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp
@@ -37,11 +39,12 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_discoverer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_with_stream.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tenant_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt
index a47d27c9265..071d97a5432 100644
--- a/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/replication/controller/CMakeLists.linux-aarch64.txt
@@ -16,6 +16,7 @@ target_link_libraries(tx-replication-controller PUBLIC
contrib-libs-cxxsupp
yutil
ydb-core-base
+ ydb-core-discovery
core-engine-minikql
ydb-core-protos
ydb-core-tablet_flat
@@ -29,6 +30,7 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/nodes_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp
@@ -38,11 +40,12 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_discoverer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_with_stream.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tenant_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
diff --git a/ydb/core/tx/replication/controller/CMakeLists.linux.txt b/ydb/core/tx/replication/controller/CMakeLists.linux.txt
index a47d27c9265..071d97a5432 100644
--- a/ydb/core/tx/replication/controller/CMakeLists.linux.txt
+++ b/ydb/core/tx/replication/controller/CMakeLists.linux.txt
@@ -16,6 +16,7 @@ target_link_libraries(tx-replication-controller PUBLIC
contrib-libs-cxxsupp
yutil
ydb-core-base
+ ydb-core-discovery
core-engine-minikql
ydb-core-protos
ydb-core-tablet_flat
@@ -29,6 +30,7 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_creator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/dst_remover.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/logging.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/nodes_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/stream_creator.cpp
@@ -38,11 +40,12 @@ target_sources(tx-replication-controller PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_discoverer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/target_with_stream.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tenant_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_create_stream_result.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_dst_result.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_replication.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/controller/tx_drop_stream_result.cpp
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index 09d0906cc19..b34afd8b861 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -1,6 +1,7 @@
#include "controller.h"
#include "controller_impl.h"
+#include <ydb/core/discovery/discovery.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
namespace NKikimr::NReplication {
@@ -56,6 +57,10 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvDropStreamResult, Handle);
HFunc(TEvPrivate::TEvCreateDstResult, Handle);
HFunc(TEvPrivate::TEvDropDstResult, Handle);
+ HFunc(TEvPrivate::TEvResolveTenantResult, Handle);
+ HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
+ HFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvents::TEvPoison, Handle);
}
}
@@ -66,6 +71,10 @@ void TController::SwitchToWork(const TActorContext& ctx) {
SignalTabletActive(ctx);
Become(&TThis::StateWork);
+ if (!DiscoveryCache) {
+ DiscoveryCache = ctx.Register(CreateDiscoveryCache());
+ }
+
for (auto& [_, replication] : Replications) {
replication->Progress(ctx);
}
@@ -122,6 +131,60 @@ void TController::Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorCon
RunTxDropDstResult(ev, ctx);
}
+void TController::Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ const auto rid = ev->Get()->ReplicationId;
+ const auto& tenant = ev->Get()->Tenant;
+
+ auto replication = Find(rid);
+ if (!replication) {
+ CLOG_W(ctx, "Unknown replication"
+ << ": rid# " << rid);
+ return;
+ }
+
+ if (ev->Get()->IsSuccess()) {
+ CLOG_N(ctx, "Tenant resolved"
+ << ": rid# " << rid
+ << ", tenant# " << tenant);
+
+ if (!NodesManager.HasTenant(tenant)) {
+ CLOG_I(ctx, "Discover tenant nodes"
+ << ": tenant# " << tenant);
+ NodesManager.DiscoverNodes(tenant, DiscoveryCache, ctx);
+ }
+ } else {
+ CLOG_E(ctx, "Resolve tenant error"
+ << ": rid# " << rid);
+ Y_VERIFY(!tenant);
+ }
+
+ replication->SetTenant(tenant);
+ replication->Progress(ctx);
+}
+
+void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+
+ const auto& tenant = ev->Get()->Tenant;
+ if (NodesManager.HasTenant(tenant)) {
+ CLOG_I(ctx, "Discover tenant nodes"
+ << ": tenant# " << tenant);
+ NodesManager.DiscoverNodes(tenant, DiscoveryCache, ctx);
+ }
+}
+
+void TController::Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+ NodesManager.ProcessResponse(ev, ctx);
+}
+
+void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+ NodesManager.ProcessResponse(ev, ctx);
+}
+
void TController::Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
@@ -129,6 +192,12 @@ void TController::Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx)
replication->Shutdown(ctx);
}
+ if (auto actorId = std::exchange(DiscoveryCache, {})) {
+ Send(actorId, new TEvents::TEvPoison());
+ }
+
+ NodesManager.Shutdown(ctx);
+
Send(Tablet(), new TEvents::TEvPoison());
Become(&TThis::StateZombie);
}
diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h
index c530ca8a906..1e521293e6e 100644
--- a/ydb/core/tx/replication/controller/controller_impl.h
+++ b/ydb/core/tx/replication/controller/controller_impl.h
@@ -1,6 +1,7 @@
#pragma once
#include "logging.h"
+#include "nodes_manager.h"
#include "private_events.h"
#include "public_events.h"
#include "replication.h"
@@ -69,6 +70,10 @@ private:
void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx);
// local transactions
@@ -123,6 +128,10 @@ private:
THashMap<ui64, TReplication::TPtr> Replications;
THashMap<TPathId, TReplication::TPtr> ReplicationsByPathId;
+ // discovery
+ TActorId DiscoveryCache;
+ TNodesManager NodesManager;
+
}; // TController
}
diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp
new file mode 100644
index 00000000000..40877c1815e
--- /dev/null
+++ b/ydb/core/tx/replication/controller/nodes_manager.cpp
@@ -0,0 +1,58 @@
+#include "nodes_manager.h"
+#include "private_events.h"
+
+#include <ydb/core/tx/replication/service/service.h>
+
+namespace NKikimr::NReplication::NController {
+
+bool TNodesManager::HasTenant(const TString& tenant) const {
+ return TenantNodes.contains(tenant);
+}
+
+const THashSet<ui32>& TNodesManager::GetNodes(const TString& tenant) const {
+ Y_VERIFY(HasTenant(tenant));
+ return TenantNodes.at(tenant);
+}
+
+void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx) {
+ TenantNodes.emplace(tenant, THashSet<ui32>());
+ NodeDiscoverers.emplace(ctx.Register(CreateDiscoverer(&NService::MakeDiscoveryPath, tenant, ctx.SelfID, cache)), tenant);
+}
+
+void TNodesManager::ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) {
+ Y_VERIFY(ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+
+ auto it = NodeDiscoverers.find(ev->Sender);
+ if (it == NodeDiscoverers.end()) {
+ return;
+ }
+
+ auto& nodes = TenantNodes[it->second];
+ nodes.clear();
+
+ for (const auto& [actorId, _] : ev->Get()->InfoEntries) {
+ nodes.insert(actorId.NodeId());
+ }
+
+ ctx.Schedule(UpdateInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second));
+ NodeDiscoverers.erase(it);
+}
+
+void TNodesManager::ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) {
+ auto it = NodeDiscoverers.find(ev->Sender);
+ if (it == NodeDiscoverers.end()) {
+ return;
+ }
+
+ ctx.Schedule(RetryInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second));
+ NodeDiscoverers.erase(it);
+}
+
+void TNodesManager::Shutdown(const TActorContext& ctx) {
+ for (const auto& [actorId, _] : std::exchange(NodeDiscoverers, {})) {
+ ctx.Send(actorId, new TEvents::TEvPoison());
+ }
+}
+
+}
+
diff --git a/ydb/core/tx/replication/controller/nodes_manager.h b/ydb/core/tx/replication/controller/nodes_manager.h
new file mode 100644
index 00000000000..d07c0de826d
--- /dev/null
+++ b/ydb/core/tx/replication/controller/nodes_manager.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <ydb/core/base/statestorage.h>
+#include <ydb/core/discovery/discovery.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/hash_set.h>
+
+namespace NKikimr::NReplication::NController {
+
+class TNodesManager {
+ static constexpr TDuration UpdateInternal = TDuration::Minutes(5);
+ static constexpr TDuration RetryInternal = TDuration::Seconds(10);
+
+public:
+ bool HasTenant(const TString& tenant) const;
+ const THashSet<ui32>& GetNodes(const TString& tenant) const;
+
+ void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx);
+ void ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx);
+ void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
+
+ void Shutdown(const TActorContext& ctx);
+
+private:
+ THashMap<TString, THashSet<ui32>> TenantNodes;
+ THashMap<TActorId, TString> NodeDiscoverers;
+};
+
+}
diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp
index ee5e24c722b..a3313312ef8 100644
--- a/ydb/core/tx/replication/controller/private_events.cpp
+++ b/ydb/core/tx/replication/controller/private_events.cpp
@@ -145,6 +145,42 @@ TString TEvPrivate::TEvDropReplication::ToString() const {
<< " }";
}
+TEvPrivate::TEvResolveTenantResult::TEvResolveTenantResult(ui64 rid, const TString& tenant)
+ : ReplicationId(rid)
+ , Tenant(tenant)
+ , Success(true)
+{
+}
+
+TEvPrivate::TEvResolveTenantResult::TEvResolveTenantResult(ui64 rid, bool success)
+ : ReplicationId(rid)
+ , Success(success)
+{
+}
+
+TString TEvPrivate::TEvResolveTenantResult::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " ReplicationId: " << ReplicationId
+ << " Tenant: " << Tenant
+ << " Sucess: " << Success
+ << " }";
+}
+
+bool TEvPrivate::TEvResolveTenantResult::IsSuccess() const {
+ return Success;
+}
+
+TEvPrivate::TEvUpdateTenantNodes::TEvUpdateTenantNodes(const TString& tenant)
+ : Tenant(tenant)
+{
+}
+
+TString TEvPrivate::TEvUpdateTenantNodes::ToString() const {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Tenant: " << Tenant
+ << " }";
+}
+
}
Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) {
diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h
index 94b474206c7..70484679305 100644
--- a/ydb/core/tx/replication/controller/private_events.h
+++ b/ydb/core/tx/replication/controller/private_events.h
@@ -18,6 +18,8 @@ struct TEvPrivate {
EvDropStreamResult,
EvDropDstResult,
EvDropReplication,
+ EvResolveTenantResult,
+ EvUpdateTenantNodes,
EvEnd,
};
@@ -104,6 +106,25 @@ struct TEvPrivate {
TString ToString() const override;
};
+ struct TEvResolveTenantResult: public TEventLocal<TEvResolveTenantResult, EvResolveTenantResult> {
+ const ui64 ReplicationId;
+ const TString Tenant;
+ const bool Success;
+
+ explicit TEvResolveTenantResult(ui64 rid, const TString& tenant);
+ explicit TEvResolveTenantResult(ui64 rid, bool success);
+ TString ToString() const override;
+
+ bool IsSuccess() const;
+ };
+
+ struct TEvUpdateTenantNodes: public TEventLocal<TEvUpdateTenantNodes, EvUpdateTenantNodes> {
+ const TString Tenant;
+
+ explicit TEvUpdateTenantNodes(const TString& tenant);
+ TString ToString() const override;
+ };
+
}; // TEvPrivate
}
diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp
index 673dd268f25..d1847dd1d10 100644
--- a/ydb/core/tx/replication/controller/replication.cpp
+++ b/ydb/core/tx/replication/controller/replication.cpp
@@ -2,6 +2,7 @@
#include "replication.h"
#include "target_discoverer.h"
#include "target_table.h"
+#include "tenant_resolver.h"
#include "util.h"
#include <ydb/core/protos/replication.pb.h>
@@ -37,7 +38,7 @@ class TReplication::TImpl {
paths.emplace_back(target.GetSrcPath(), target.GetDstPath());
}
- Discoverer = ctx.Register(CreateTargetDiscoverer(ctx.SelfID, ReplicationId, YdbProxy, std::move(paths)));
+ TargetDiscoverer = ctx.Register(CreateTargetDiscoverer(ctx.SelfID, ReplicationId, YdbProxy, std::move(paths)));
break;
}
@@ -101,6 +102,10 @@ public:
}
}
+ if (!Tenant && !TenantResolver) {
+ TenantResolver = ctx.Register(CreateTenantResolver(ctx.SelfID, ReplicationId, PathId));
+ }
+
switch (State) {
case EState::Ready:
if (!Targets) {
@@ -124,7 +129,7 @@ public:
target->Shutdown(ctx);
}
- for (auto& x : TVector<TActorId>{Discoverer, YdbProxy}) {
+ for (auto& x : TVector<TActorId>{TargetDiscoverer, TenantResolver, YdbProxy}) {
if (auto actorId = std::exchange(x, {})) {
ctx.Send(actorId, new TEvents::TEvPoison());
}
@@ -143,6 +148,7 @@ public:
private:
const ui64 ReplicationId;
const TPathId PathId;
+ TString Tenant;
NKikimrReplication::TReplicationConfig Config;
EState State = EState::Ready;
@@ -150,7 +156,8 @@ private:
ui64 NextTargetId = 1;
THashMap<ui64, THolder<ITarget>> Targets;
TActorId YdbProxy;
- TActorId Discoverer;
+ TActorId TenantResolver;
+ TActorId TargetDiscoverer;
}; // TImpl
@@ -232,6 +239,15 @@ ui64 TReplication::GetNextTargetId() const {
return Impl->NextTargetId;
}
+void TReplication::SetTenant(const TString& value) {
+ Impl->Tenant = value;
+ Impl->TenantResolver = {};
+}
+
+const TString& TReplication::GetTenant() const {
+ return Impl->Tenant;
+}
+
void TReplication::SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId) {
DropOp = {sender, opId};
}
diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h
index db1ab27046e..5ab51fd34d2 100644
--- a/ydb/core/tx/replication/controller/replication.h
+++ b/ydb/core/tx/replication/controller/replication.h
@@ -100,6 +100,9 @@ public:
void SetNextTargetId(ui64 value);
ui64 GetNextTargetId() const;
+ void SetTenant(const TString& value);
+ const TString& GetTenant() const;
+
void SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId);
const std::optional<TDropOp>& GetDropOp() const;
diff --git a/ydb/core/tx/replication/controller/tenant_resolver.cpp b/ydb/core/tx/replication/controller/tenant_resolver.cpp
new file mode 100644
index 00000000000..1cf4b8221db
--- /dev/null
+++ b/ydb/core/tx/replication/controller/tenant_resolver.cpp
@@ -0,0 +1,103 @@
+#include "logging.h"
+#include "private_events.h"
+#include "tenant_resolver.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NKikimr::NReplication::NController {
+
+class TTenantResolver: public TActorBootstrapped<TTenantResolver> {
+ void Resolve(const TPathId& pathId) {
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+
+ auto& entry = request->ResultSet.emplace_back();
+ entry.TableId = pathId;
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
+ entry.RedirectRequired = false;
+
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ const auto* response = ev->Get()->Request.Get();
+
+ Y_VERIFY(response->ResultSet.size() == 1);
+ const auto& entry = response->ResultSet.front();
+
+ LOG_T("Handle " << ev->Get()->ToString()
+ << ": entry# " << entry.ToString());
+
+ switch (entry.Status) {
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
+ break;
+ default:
+ LOG_W("Unexpected status"
+ << ": entry# " << entry.ToString());
+ return Reply(false);
+ }
+
+ if (!DomainKey) {
+ if (!entry.DomainInfo) {
+ LOG_E("Empty domain info"
+ << ": entry# " << entry.ToString());
+ return Reply(false);
+ }
+
+ DomainKey = entry.DomainInfo->DomainKey;
+ Resolve(DomainKey);
+ } else {
+ return Reply(CanonizePath(entry.Path));
+ }
+ }
+
+ template <typename... Args>
+ void Reply(Args&&... args) {
+ Send(Parent, new TEvPrivate::TEvResolveTenantResult(ReplicationId, std::forward<Args>(args)...));
+ PassAway();
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TENANT_RESOLVER;
+ }
+
+ explicit TTenantResolver(const TActorId& parent, ui64 rid, const TPathId& pathId)
+ : Parent(parent)
+ , ReplicationId(rid)
+ , PathId(pathId)
+ , LogPrefix("TenantResolver", ReplicationId)
+ {
+ }
+
+ void Bootstrap() {
+ Resolve(PathId);
+ Become(&TThis::StateWork);
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ const TActorId Parent;
+ const ui64 ReplicationId;
+ const TPathId PathId;
+ const TActorLogPrefix LogPrefix;
+
+ TPathId DomainKey;
+
+}; // TTenantResolver
+
+IActor* CreateTenantResolver(const TActorId& parent, ui64 rid, const TPathId& pathId) {
+ return new TTenantResolver(parent, rid, pathId);
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/tenant_resolver.h b/ydb/core/tx/replication/controller/tenant_resolver.h
new file mode 100644
index 00000000000..b5f75161074
--- /dev/null
+++ b/ydb/core/tx/replication/controller/tenant_resolver.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+
+namespace NKikimr::NReplication::NController {
+
+IActor* CreateTenantResolver(const TActorId& parent, ui64 rid, const TPathId& pathId);
+
+}
diff --git a/ydb/core/tx/replication/controller/tx_create_replication.cpp b/ydb/core/tx/replication/controller/tx_create_replication.cpp
index b07ea95ce8c..0899c55c5e6 100644
--- a/ydb/core/tx/replication/controller/tx_create_replication.cpp
+++ b/ydb/core/tx/replication/controller/tx_create_replication.cpp
@@ -21,7 +21,7 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
CLOG_D(ctx, "Execute: " << Ev->Get()->ToString());
- const auto& record = Ev->Get()->Record;
+ auto& record = Ev->Get()->Record;
Result = MakeHolder<TEvController::TEvCreateReplicationResult>();
Result->Record.MutableOperationId()->CopyFrom(record.GetOperationId());
Result->Record.SetOrigin(Self->TabletID());
@@ -36,18 +36,18 @@ public:
}
NIceDb::TNiceDb db(txc.DB);
+
const auto rid = Self->SysParams.AllocateReplicationId(db);
+ CLOG_N(ctx, "Add replication"
+ << ": rid# " << rid
+ << ", pathId# " << pathId);
- Replication = Self->Add(rid, pathId, record.GetConfig());
db.Table<Schema::Replications>().Key(rid).Update(
NIceDb::TUpdate<Schema::Replications::PathOwnerId>(pathId.OwnerId),
NIceDb::TUpdate<Schema::Replications::PathLocalId>(pathId.LocalPathId),
NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString())
);
-
- CLOG_N(ctx, "Add replication"
- << ": rid# " << rid
- << ", pathId# " << pathId);
+ Replication = Self->Add(rid, pathId, std::move(*record.MutableConfig()));
Result->Record.SetStatus(NKikimrReplication::TEvCreateReplicationResult::SUCCESS);
return true;
diff --git a/ydb/core/tx/replication/controller/tx_discovery_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
index 686de8fb261..686de8fb261 100644
--- a/ydb/core/tx/replication/controller/tx_discovery_result.cpp
+++ b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
diff --git a/ydb/core/tx/replication/service/CMakeLists.darwin.txt b/ydb/core/tx/replication/service/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..ce070b006e1
--- /dev/null
+++ b/ydb/core/tx/replication/service/CMakeLists.darwin.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-replication-service)
+target_compile_options(tx-replication-service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(tx-replication-service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+)
+target_sources(tx-replication-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/service/service.cpp
+)
diff --git a/ydb/core/tx/replication/service/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/service/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..68c0eddf13d
--- /dev/null
+++ b/ydb/core/tx/replication/service/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-replication-service)
+target_compile_options(tx-replication-service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(tx-replication-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+)
+target_sources(tx-replication-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/service/service.cpp
+)
diff --git a/ydb/core/tx/replication/service/CMakeLists.linux.txt b/ydb/core/tx/replication/service/CMakeLists.linux.txt
new file mode 100644
index 00000000000..68c0eddf13d
--- /dev/null
+++ b/ydb/core/tx/replication/service/CMakeLists.linux.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-replication-service)
+target_compile_options(tx-replication-service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(tx-replication-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+)
+target_sources(tx-replication-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/service/service.cpp
+)
diff --git a/ydb/core/tx/replication/service/CMakeLists.txt b/ydb/core/tx/replication/service/CMakeLists.txt
new file mode 100644
index 00000000000..5bb4faffb40
--- /dev/null
+++ b/ydb/core/tx/replication/service/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp
new file mode 100644
index 00000000000..bc9489a71d7
--- /dev/null
+++ b/ydb/core/tx/replication/service/service.cpp
@@ -0,0 +1,63 @@
+#include "service.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/path.h>
+#include <ydb/core/base/statestorage.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NKikimr::NReplication {
+
+namespace NService {
+
+class TReplicationService: public TActorBootstrapped<TReplicationService> {
+ void RunBoardPublisher() {
+ const auto& tenant = AppData()->TenantName;
+
+ auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(ExtractDomain(tenant));
+ if (!domainInfo) {
+ return PassAway();
+ }
+
+ const auto boardPath = MakeDiscoveryPath(tenant);
+ const auto groupId = domainInfo->DefaultStateStorageGroup;
+ BoardPublisher = Register(CreateBoardPublishActor(boardPath, TString(), SelfId(), groupId, 0, true));
+ }
+
+ void PassAway() override {
+ if (auto actorId = std::exchange(BoardPublisher, {})) {
+ Send(actorId, new TEvents::TEvPoison());
+ }
+
+ TActorBootstrapped<TReplicationService>::PassAway();
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::REPLICATION_SERVICE;
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateWork);
+ RunBoardPublisher();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ TActorId BoardPublisher;
+
+}; // TReplicationService
+
+} // NService
+
+IActor* CreateReplicationService() {
+ return new NService::TReplicationService();
+}
+
+}
diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h
new file mode 100644
index 00000000000..fb113e2d61c
--- /dev/null
+++ b/ydb/core/tx/replication/service/service.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+
+namespace NKikimr::NReplication {
+
+namespace NService {
+
+inline TString MakeDiscoveryPath(const TString& tenant) {
+ return "rs+" + tenant;
+}
+
+} // NService
+
+inline TActorId MakeReplicationServiceId(ui32 nodeId) {
+ return TActorId(nodeId, TStringBuf("ReplictnSvc"));
+}
+
+IActor* CreateReplicationService();
+
+}