aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-03-04 18:17:53 +0500
committerGitHub <noreply@github.com>2025-03-04 13:17:53 +0000
commit29d6dd36f36aeeafb080f82222de46d7f1eb8a1c (patch)
treea957fb78b7ba14959781f05f463da256a6b56ca3
parent4f16affd6c83c13cfb5be7b28572c6a307b14098 (diff)
downloadydb-29d6dd36f36aeeafb080f82222de46d7f1eb8a1c.tar.gz
Move TTargetTransfer to target_transfer.h (#15269)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
-rw-r--r--ydb/core/tx/replication/controller/event_util.cpp2
-rw-r--r--ydb/core/tx/replication/controller/replication.cpp1
-rw-r--r--ydb/core/tx/replication/controller/target_discoverer.cpp1
-rw-r--r--ydb/core/tx/replication/controller/target_discoverer_ut.cpp1
-rw-r--r--ydb/core/tx/replication/controller/target_table.cpp165
-rw-r--r--ydb/core/tx/replication/controller/target_table.h29
-rw-r--r--ydb/core/tx/replication/controller/target_transfer.cpp68
-rw-r--r--ydb/core/tx/replication/controller/target_transfer.h34
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.cpp110
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.h2
-rw-r--r--ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp3
-rw-r--r--ydb/core/tx/replication/controller/tx_init.cpp1
-rw-r--r--ydb/core/tx/replication/controller/ya.make1
13 files changed, 221 insertions, 197 deletions
diff --git a/ydb/core/tx/replication/controller/event_util.cpp b/ydb/core/tx/replication/controller/event_util.cpp
index 77080559a7b..9b685b9d5ed 100644
--- a/ydb/core/tx/replication/controller/event_util.cpp
+++ b/ydb/core/tx/replication/controller/event_util.cpp
@@ -1,5 +1,5 @@
#include "event_util.h"
-#include "target_table.h"
+#include "target_transfer.h"
namespace NKikimr::NReplication::NController {
diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp
index b0417987f82..a22bc44c907 100644
--- a/ydb/core/tx/replication/controller/replication.cpp
+++ b/ydb/core/tx/replication/controller/replication.cpp
@@ -4,6 +4,7 @@
#include "secret_resolver.h"
#include "target_discoverer.h"
#include "target_table.h"
+#include "target_transfer.h"
#include "tenant_resolver.h"
#include "util.h"
diff --git a/ydb/core/tx/replication/controller/target_discoverer.cpp b/ydb/core/tx/replication/controller/target_discoverer.cpp
index c54547166ef..b9fd9adf25a 100644
--- a/ydb/core/tx/replication/controller/target_discoverer.cpp
+++ b/ydb/core/tx/replication/controller/target_discoverer.cpp
@@ -2,6 +2,7 @@
#include "private_events.h"
#include "target_discoverer.h"
#include "target_table.h"
+#include "target_transfer.h"
#include "util.h"
#include <ydb/core/base/path.h>
diff --git a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp
index 72e4a43c327..8808167eeb2 100644
--- a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp
+++ b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp
@@ -1,6 +1,7 @@
#include "private_events.h"
#include "target_discoverer.h"
#include "target_table.h"
+#include "target_transfer.h"
#include <ydb/core/tx/replication/ut_helpers/test_env.h>
#include <ydb/core/tx/replication/ut_helpers/test_table.h>
diff --git a/ydb/core/tx/replication/controller/target_table.cpp b/ydb/core/tx/replication/controller/target_table.cpp
index f76058250df..8864db28fa7 100644
--- a/ydb/core/tx/replication/controller/target_table.cpp
+++ b/ydb/core/tx/replication/controller/target_table.cpp
@@ -1,108 +1,9 @@
-#include "event_util.h"
-#include "logging.h"
-#include "stream_consumer_remover.h"
#include "target_table.h"
-#include "util.h"
#include <ydb/core/base/path.h>
-#include <ydb/core/scheme/scheme_pathid.h>
-#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-#include <ydb/library/actors/core/hfunc.h>
namespace NKikimr::NReplication::NController {
-class TTableWorkerRegistar: public TActorBootstrapped<TTableWorkerRegistar> {
- void Handle(TEvYdbProxy::TEvDescribeTopicResponse::TPtr& ev) {
- LOG_T("Handle " << ev->Get()->ToString());
-
- const auto& result = ev->Get()->Result;
- if (!result.IsSuccess()) {
- if (IsRetryableError(result)) {
- LOG_W("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Retry.");
- return Retry();
- }
-
- LOG_E("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Stop.");
- return; // TODO: hard error
- }
-
- for (const auto& partition : result.GetTopicDescription().GetPartitions()) {
- if (!partition.GetParentPartitionIds().empty()) {
- continue;
- }
-
- auto ev = MakeRunWorkerEv(
- ReplicationId, TargetId, Config, partition.GetPartitionId(),
- ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId);
- Send(Parent, std::move(ev));
- }
-
- PassAway();
- }
-
- void Retry() {
- LOG_D("Retry");
- Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
- }
-
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR;
- }
-
- explicit TTableWorkerRegistar(
- const TActorId& parent,
- const TActorId& proxy,
- const NKikimrReplication::TConnectionParams& connectionParams,
- const NKikimrReplication::TConsistencySettings& consistencySettings,
- ui64 rid,
- ui64 tid,
- const TString& srcStreamPath,
- const TString& srcStreamConsumerName,
- const TPathId& dstPathId,
- const TReplication::ITarget::IConfig::TPtr& config)
- : Parent(parent)
- , YdbProxy(proxy)
- , ConnectionParams(connectionParams)
- , ConsistencySettings(consistencySettings)
- , ReplicationId(rid)
- , TargetId(tid)
- , SrcStreamPath(srcStreamPath)
- , SrcStreamConsumerName(srcStreamConsumerName)
- , DstPathId(dstPathId)
- , LogPrefix("TableWorkerRegistar", ReplicationId, TargetId)
- , Config(config)
- {
- }
-
- void Bootstrap() {
- Become(&TThis::StateWork);
- Send(YdbProxy, new TEvYdbProxy::TEvDescribeTopicRequest(SrcStreamPath, {}));
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvYdbProxy::TEvDescribeTopicResponse, Handle);
- sFunc(TEvents::TEvWakeup, Bootstrap);
- sFunc(TEvents::TEvPoison, PassAway);
- }
- }
-
-private:
- const TActorId Parent;
- const TActorId YdbProxy;
- const NKikimrReplication::TConnectionParams ConnectionParams;
- const NKikimrReplication::TConsistencySettings ConsistencySettings;
- const ui64 ReplicationId;
- const ui64 TargetId;
- const TString SrcStreamPath;
- const TString SrcStreamConsumerName;
- const TPathId DstPathId;
- const TActorLogPrefix LogPrefix;
- const TReplication::ITarget::IConfig::TPtr Config;
-
-}; // TTableWorkerRegistar
TTargetTableBase::TTargetTableBase(TReplication* replication, ETargetKind finalKind,
ui64 id, const IConfig::TPtr& config)
@@ -110,14 +11,6 @@ TTargetTableBase::TTargetTableBase(TReplication* replication, ETargetKind finalK
{
}
-IActor* TTargetTableBase::CreateWorkerRegistar(const TActorContext& ctx) const {
- auto replication = GetReplication();
- const auto& config = replication->GetConfig();
- return new TTableWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(),
- config.GetSrcConnectionParams(), config.GetConsistencySettings(),
- replication->GetId(), GetId(), BuildStreamPath(), GetStreamConsumerName(), GetDstPathId(), GetConfig());
-}
-
TTargetTable::TTargetTable(TReplication* replication, ui64 id, const IConfig::TPtr& config)
: TTargetTableBase(replication, ETargetKind::Table, id, config)
{
@@ -140,62 +33,4 @@ TString TTargetIndexTable::BuildStreamPath() const {
return CanonizePath(ChildPath(SplitPath(GetSrcPath()), {"indexImplTable", GetStreamName()}));
}
-TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config)
- : TTargetTableBase(replication, ETargetKind::Transfer, id, config)
-{
-}
-
-void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
- auto& t = cfg.GetTransferSpecific().GetTargets(0);
- Config = std::make_shared<TTargetTransfer::TTransferConfig>(
- GetConfig()->GetSrcPath(),
- GetConfig()->GetDstPath(),
- t.GetTransformLambda());
-}
-
-void TTargetTransfer::Progress(const TActorContext& ctx) {
- auto replication = GetReplication();
-
- switch (GetStreamState()) {
- case EStreamState::Removing:
- if (GetWorkers()) {
- RemoveWorkers(ctx);
- } else if (!StreamConsumerRemover) {
- StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx));
- }
- return;
- case EStreamState::Creating:
- case EStreamState::Ready:
- case EStreamState::Removed:
- case EStreamState::Error:
- break;
- }
-
- TTargetWithStream::Progress(ctx);
-}
-
-void TTargetTransfer::Shutdown(const TActorContext& ctx) {
- for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) {
- if (auto actorId = std::exchange(*x, {})) {
- ctx.Send(actorId, new TEvents::TEvPoison());
- }
- }
-
- TTargetWithStream::Shutdown(ctx);
-}
-
-TString TTargetTransfer::BuildStreamPath() const {
- return CanonizePath(GetSrcPath());
-}
-
-TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda)
- : TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
- , TransformLambda(transformLambda)
-{
-}
-
-const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const {
- return TransformLambda;
-}
-
}
diff --git a/ydb/core/tx/replication/controller/target_table.h b/ydb/core/tx/replication/controller/target_table.h
index 23c70dff7cd..92f27e5ca92 100644
--- a/ydb/core/tx/replication/controller/target_table.h
+++ b/ydb/core/tx/replication/controller/target_table.h
@@ -12,7 +12,6 @@ public:
TString GetStreamPath() const override;
protected:
- IActor* CreateWorkerRegistar(const TActorContext& ctx) const override;
virtual TString BuildStreamPath() const = 0;
};
@@ -50,32 +49,4 @@ protected:
TString BuildStreamPath() const override;
};
-class TTargetTransfer: public TTargetTableBase {
-public:
- struct TTransferConfig : public TConfigBase {
- using TPtr = std::shared_ptr<TTransferConfig>;
-
- TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda);
-
- const TString& GetTransformLambda() const;
-
- private:
- TString TransformLambda;
- };
-
- explicit TTargetTransfer(TReplication* replication,
- ui64 id, const IConfig::TPtr& config);
-
- void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
-
- void Progress(const TActorContext& ctx) override;
- void Shutdown(const TActorContext& ctx) override;
-
-protected:
- TString BuildStreamPath() const override;
-
-private:
- TActorId StreamConsumerRemover;
-};
-
}
diff --git a/ydb/core/tx/replication/controller/target_transfer.cpp b/ydb/core/tx/replication/controller/target_transfer.cpp
new file mode 100644
index 00000000000..8f8a392a21d
--- /dev/null
+++ b/ydb/core/tx/replication/controller/target_transfer.cpp
@@ -0,0 +1,68 @@
+#include "stream_consumer_remover.h"
+#include "target_transfer.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/tx/replication/service/service.h>
+#include <ydb/library/actors/core/events.h>
+
+namespace NKikimr::NReplication::NController {
+
+TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config)
+ : TTargetWithStream(replication, ETargetKind::Transfer, id, config)
+{
+}
+
+void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
+ auto& t = cfg.GetTransferSpecific().GetTargets(0);
+ Config = std::make_shared<TTargetTransfer::TTransferConfig>(
+ GetConfig()->GetSrcPath(),
+ GetConfig()->GetDstPath(),
+ t.GetTransformLambda());
+}
+
+void TTargetTransfer::Progress(const TActorContext& ctx) {
+ auto replication = GetReplication();
+
+ switch (GetStreamState()) {
+ case EStreamState::Removing:
+ if (GetWorkers()) {
+ RemoveWorkers(ctx);
+ } else if (!StreamConsumerRemover) {
+ StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx));
+ }
+ return;
+ case EStreamState::Creating:
+ case EStreamState::Ready:
+ case EStreamState::Removed:
+ case EStreamState::Error:
+ break;
+ }
+
+ TTargetWithStream::Progress(ctx);
+}
+
+void TTargetTransfer::Shutdown(const TActorContext& ctx) {
+ for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) {
+ if (auto actorId = std::exchange(*x, {})) {
+ ctx.Send(actorId, new TEvents::TEvPoison());
+ }
+ }
+
+ TTargetWithStream::Shutdown(ctx);
+}
+
+TString TTargetTransfer::GetStreamPath() const {
+ return CanonizePath(GetSrcPath());
+}
+
+TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda)
+ : TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
+ , TransformLambda(transformLambda)
+{
+}
+
+const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const {
+ return TransformLambda;
+}
+
+}
diff --git a/ydb/core/tx/replication/controller/target_transfer.h b/ydb/core/tx/replication/controller/target_transfer.h
new file mode 100644
index 00000000000..d45818ee8d0
--- /dev/null
+++ b/ydb/core/tx/replication/controller/target_transfer.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include "target_with_stream.h"
+
+namespace NKikimr::NReplication::NController {
+
+class TTargetTransfer: public TTargetWithStream {
+public:
+ struct TTransferConfig : public TConfigBase {
+ using TPtr = std::shared_ptr<TTransferConfig>;
+
+ TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda);
+
+ const TString& GetTransformLambda() const;
+
+ private:
+ TString TransformLambda;
+ };
+
+ explicit TTargetTransfer(TReplication* replication,
+ ui64 id, const IConfig::TPtr& config);
+
+ void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
+
+ void Progress(const TActorContext& ctx) override;
+ void Shutdown(const TActorContext& ctx) override;
+
+ TString GetStreamPath() const override;
+
+private:
+ TActorId StreamConsumerRemover;
+};
+
+}
diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp
index 96145c6783b..20ee070b78d 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.cpp
+++ b/ydb/core/tx/replication/controller/target_with_stream.cpp
@@ -1,14 +1,116 @@
+#include "event_util.h"
+#include "logging.h"
#include "private_events.h"
#include "stream_creator.h"
#include "stream_remover.h"
#include "target_with_stream.h"
+#include "util.h"
+#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/hfunc.h>
namespace NKikimr::NReplication::NController {
const TString ReplicationConsumerName = "replicationConsumer";
+namespace {
+
+class TWorkerRegistar: public TActorBootstrapped<TWorkerRegistar> {
+ void Handle(TEvYdbProxy::TEvDescribeTopicResponse::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+
+ const auto& result = ev->Get()->Result;
+ if (!result.IsSuccess()) {
+ if (IsRetryableError(result)) {
+ LOG_W("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Retry.");
+ return Retry();
+ }
+
+ LOG_E("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Stop.");
+ return; // TODO: hard error
+ }
+
+ for (const auto& partition : result.GetTopicDescription().GetPartitions()) {
+ if (!partition.GetParentPartitionIds().empty()) {
+ continue;
+ }
+
+ auto ev = MakeRunWorkerEv(
+ ReplicationId, TargetId, Config, partition.GetPartitionId(),
+ ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId);
+ Send(Parent, std::move(ev));
+ }
+
+ PassAway();
+ }
+
+ void Retry() {
+ LOG_D("Retry");
+ Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR;
+ }
+
+ explicit TWorkerRegistar(
+ const TActorId& parent,
+ const TActorId& proxy,
+ const NKikimrReplication::TConnectionParams& connectionParams,
+ const NKikimrReplication::TConsistencySettings& consistencySettings,
+ ui64 rid,
+ ui64 tid,
+ const TString& srcStreamPath,
+ const TString& srcStreamConsumerName,
+ const TPathId& dstPathId,
+ const TReplication::ITarget::IConfig::TPtr& config)
+ : Parent(parent)
+ , YdbProxy(proxy)
+ , ConnectionParams(connectionParams)
+ , ConsistencySettings(consistencySettings)
+ , ReplicationId(rid)
+ , TargetId(tid)
+ , SrcStreamPath(srcStreamPath)
+ , SrcStreamConsumerName(srcStreamConsumerName)
+ , DstPathId(dstPathId)
+ , LogPrefix("TableWorkerRegistar", ReplicationId, TargetId)
+ , Config(config)
+ {
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateWork);
+ Send(YdbProxy, new TEvYdbProxy::TEvDescribeTopicRequest(SrcStreamPath, {}));
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvYdbProxy::TEvDescribeTopicResponse, Handle);
+ sFunc(TEvents::TEvWakeup, Bootstrap);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ const TActorId Parent;
+ const TActorId YdbProxy;
+ const NKikimrReplication::TConnectionParams ConnectionParams;
+ const NKikimrReplication::TConsistencySettings ConsistencySettings;
+ const ui64 ReplicationId;
+ const ui64 TargetId;
+ const TString SrcStreamPath;
+ const TString SrcStreamConsumerName;
+ const TPathId DstPathId;
+ const TActorLogPrefix LogPrefix;
+ const TReplication::ITarget::IConfig::TPtr Config;
+
+}; // TWorkerRegistar
+
+} // namespace
+
void TTargetWithStream::Progress(const TActorContext& ctx) {
auto replication = GetReplication();
@@ -47,4 +149,12 @@ void TTargetWithStream::Shutdown(const TActorContext& ctx) {
TTargetBase::Shutdown(ctx);
}
+IActor* TTargetWithStream::CreateWorkerRegistar(const TActorContext& ctx) const {
+ auto replication = GetReplication();
+ const auto& config = replication->GetConfig();
+ return new TWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(),
+ config.GetSrcConnectionParams(), config.GetConsistencySettings(),
+ replication->GetId(), GetId(), GetStreamPath(), GetStreamConsumerName(), GetDstPathId(), GetConfig());
+}
+
}
diff --git a/ydb/core/tx/replication/controller/target_with_stream.h b/ydb/core/tx/replication/controller/target_with_stream.h
index 06a72aada18..02998940efd 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.h
+++ b/ydb/core/tx/replication/controller/target_with_stream.h
@@ -18,6 +18,8 @@ public:
void Progress(const TActorContext& ctx) override;
void Shutdown(const TActorContext& ctx) override;
+ IActor* CreateWorkerRegistar(const TActorContext& ctx) const override;
+
private:
bool NameAssignmentInProcess = false;
TActorId StreamCreator;
diff --git a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
index cd247e2b5dd..48d37a57b9f 100644
--- a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
+++ b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
@@ -1,6 +1,5 @@
#include "controller_impl.h"
-#include "target_table.h"
-#include "util.h"
+#include "target_transfer.h"
#include <util/string/join.h>
diff --git a/ydb/core/tx/replication/controller/tx_init.cpp b/ydb/core/tx/replication/controller/tx_init.cpp
index 749dd1834d0..c6403af6182 100644
--- a/ydb/core/tx/replication/controller/tx_init.cpp
+++ b/ydb/core/tx/replication/controller/tx_init.cpp
@@ -1,5 +1,6 @@
#include "controller_impl.h"
#include "target_table.h"
+#include "target_transfer.h"
namespace NKikimr::NReplication::NController {
diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make
index 758383bf875..6c436ef7abd 100644
--- a/ydb/core/tx/replication/controller/ya.make
+++ b/ydb/core/tx/replication/controller/ya.make
@@ -38,6 +38,7 @@ SRCS(
target_base.cpp
target_discoverer.cpp
target_table.cpp
+ target_transfer.cpp
target_with_stream.cpp
tenant_resolver.cpp
tx_assign_tx_id.cpp