diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-03-04 18:17:53 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-04 13:17:53 +0000 |
commit | 29d6dd36f36aeeafb080f82222de46d7f1eb8a1c (patch) | |
tree | a957fb78b7ba14959781f05f463da256a6b56ca3 | |
parent | 4f16affd6c83c13cfb5be7b28572c6a307b14098 (diff) | |
download | ydb-29d6dd36f36aeeafb080f82222de46d7f1eb8a1c.tar.gz |
Move TTargetTransfer to target_transfer.h (#15269)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
13 files changed, 221 insertions, 197 deletions
diff --git a/ydb/core/tx/replication/controller/event_util.cpp b/ydb/core/tx/replication/controller/event_util.cpp index 77080559a7b..9b685b9d5ed 100644 --- a/ydb/core/tx/replication/controller/event_util.cpp +++ b/ydb/core/tx/replication/controller/event_util.cpp @@ -1,5 +1,5 @@ #include "event_util.h" -#include "target_table.h" +#include "target_transfer.h" namespace NKikimr::NReplication::NController { diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index b0417987f82..a22bc44c907 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -4,6 +4,7 @@ #include "secret_resolver.h" #include "target_discoverer.h" #include "target_table.h" +#include "target_transfer.h" #include "tenant_resolver.h" #include "util.h" diff --git a/ydb/core/tx/replication/controller/target_discoverer.cpp b/ydb/core/tx/replication/controller/target_discoverer.cpp index c54547166ef..b9fd9adf25a 100644 --- a/ydb/core/tx/replication/controller/target_discoverer.cpp +++ b/ydb/core/tx/replication/controller/target_discoverer.cpp @@ -2,6 +2,7 @@ #include "private_events.h" #include "target_discoverer.h" #include "target_table.h" +#include "target_transfer.h" #include "util.h" #include <ydb/core/base/path.h> diff --git a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp index 72e4a43c327..8808167eeb2 100644 --- a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp +++ b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp @@ -1,6 +1,7 @@ #include "private_events.h" #include "target_discoverer.h" #include "target_table.h" +#include "target_transfer.h" #include <ydb/core/tx/replication/ut_helpers/test_env.h> #include <ydb/core/tx/replication/ut_helpers/test_table.h> diff --git a/ydb/core/tx/replication/controller/target_table.cpp b/ydb/core/tx/replication/controller/target_table.cpp index f76058250df..8864db28fa7 100644 --- a/ydb/core/tx/replication/controller/target_table.cpp +++ b/ydb/core/tx/replication/controller/target_table.cpp @@ -1,108 +1,9 @@ -#include "event_util.h" -#include "logging.h" -#include "stream_consumer_remover.h" #include "target_table.h" -#include "util.h" #include <ydb/core/base/path.h> -#include <ydb/core/scheme/scheme_pathid.h> -#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h> -#include <ydb/library/actors/core/actor_bootstrapped.h> -#include <ydb/library/actors/core/hfunc.h> namespace NKikimr::NReplication::NController { -class TTableWorkerRegistar: public TActorBootstrapped<TTableWorkerRegistar> { - void Handle(TEvYdbProxy::TEvDescribeTopicResponse::TPtr& ev) { - LOG_T("Handle " << ev->Get()->ToString()); - - const auto& result = ev->Get()->Result; - if (!result.IsSuccess()) { - if (IsRetryableError(result)) { - LOG_W("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Retry."); - return Retry(); - } - - LOG_E("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Stop."); - return; // TODO: hard error - } - - for (const auto& partition : result.GetTopicDescription().GetPartitions()) { - if (!partition.GetParentPartitionIds().empty()) { - continue; - } - - auto ev = MakeRunWorkerEv( - ReplicationId, TargetId, Config, partition.GetPartitionId(), - ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId); - Send(Parent, std::move(ev)); - } - - PassAway(); - } - - void Retry() { - LOG_D("Retry"); - Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup()); - } - -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR; - } - - explicit TTableWorkerRegistar( - const TActorId& parent, - const TActorId& proxy, - const NKikimrReplication::TConnectionParams& connectionParams, - const NKikimrReplication::TConsistencySettings& consistencySettings, - ui64 rid, - ui64 tid, - const TString& srcStreamPath, - const TString& srcStreamConsumerName, - const TPathId& dstPathId, - const TReplication::ITarget::IConfig::TPtr& config) - : Parent(parent) - , YdbProxy(proxy) - , ConnectionParams(connectionParams) - , ConsistencySettings(consistencySettings) - , ReplicationId(rid) - , TargetId(tid) - , SrcStreamPath(srcStreamPath) - , SrcStreamConsumerName(srcStreamConsumerName) - , DstPathId(dstPathId) - , LogPrefix("TableWorkerRegistar", ReplicationId, TargetId) - , Config(config) - { - } - - void Bootstrap() { - Become(&TThis::StateWork); - Send(YdbProxy, new TEvYdbProxy::TEvDescribeTopicRequest(SrcStreamPath, {})); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvYdbProxy::TEvDescribeTopicResponse, Handle); - sFunc(TEvents::TEvWakeup, Bootstrap); - sFunc(TEvents::TEvPoison, PassAway); - } - } - -private: - const TActorId Parent; - const TActorId YdbProxy; - const NKikimrReplication::TConnectionParams ConnectionParams; - const NKikimrReplication::TConsistencySettings ConsistencySettings; - const ui64 ReplicationId; - const ui64 TargetId; - const TString SrcStreamPath; - const TString SrcStreamConsumerName; - const TPathId DstPathId; - const TActorLogPrefix LogPrefix; - const TReplication::ITarget::IConfig::TPtr Config; - -}; // TTableWorkerRegistar TTargetTableBase::TTargetTableBase(TReplication* replication, ETargetKind finalKind, ui64 id, const IConfig::TPtr& config) @@ -110,14 +11,6 @@ TTargetTableBase::TTargetTableBase(TReplication* replication, ETargetKind finalK { } -IActor* TTargetTableBase::CreateWorkerRegistar(const TActorContext& ctx) const { - auto replication = GetReplication(); - const auto& config = replication->GetConfig(); - return new TTableWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(), - config.GetSrcConnectionParams(), config.GetConsistencySettings(), - replication->GetId(), GetId(), BuildStreamPath(), GetStreamConsumerName(), GetDstPathId(), GetConfig()); -} - TTargetTable::TTargetTable(TReplication* replication, ui64 id, const IConfig::TPtr& config) : TTargetTableBase(replication, ETargetKind::Table, id, config) { @@ -140,62 +33,4 @@ TString TTargetIndexTable::BuildStreamPath() const { return CanonizePath(ChildPath(SplitPath(GetSrcPath()), {"indexImplTable", GetStreamName()})); } -TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config) - : TTargetTableBase(replication, ETargetKind::Transfer, id, config) -{ -} - -void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) { - auto& t = cfg.GetTransferSpecific().GetTargets(0); - Config = std::make_shared<TTargetTransfer::TTransferConfig>( - GetConfig()->GetSrcPath(), - GetConfig()->GetDstPath(), - t.GetTransformLambda()); -} - -void TTargetTransfer::Progress(const TActorContext& ctx) { - auto replication = GetReplication(); - - switch (GetStreamState()) { - case EStreamState::Removing: - if (GetWorkers()) { - RemoveWorkers(ctx); - } else if (!StreamConsumerRemover) { - StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx)); - } - return; - case EStreamState::Creating: - case EStreamState::Ready: - case EStreamState::Removed: - case EStreamState::Error: - break; - } - - TTargetWithStream::Progress(ctx); -} - -void TTargetTransfer::Shutdown(const TActorContext& ctx) { - for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) { - if (auto actorId = std::exchange(*x, {})) { - ctx.Send(actorId, new TEvents::TEvPoison()); - } - } - - TTargetWithStream::Shutdown(ctx); -} - -TString TTargetTransfer::BuildStreamPath() const { - return CanonizePath(GetSrcPath()); -} - -TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda) - : TConfigBase(ETargetKind::Transfer, srcPath, dstPath) - , TransformLambda(transformLambda) -{ -} - -const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const { - return TransformLambda; -} - } diff --git a/ydb/core/tx/replication/controller/target_table.h b/ydb/core/tx/replication/controller/target_table.h index 23c70dff7cd..92f27e5ca92 100644 --- a/ydb/core/tx/replication/controller/target_table.h +++ b/ydb/core/tx/replication/controller/target_table.h @@ -12,7 +12,6 @@ public: TString GetStreamPath() const override; protected: - IActor* CreateWorkerRegistar(const TActorContext& ctx) const override; virtual TString BuildStreamPath() const = 0; }; @@ -50,32 +49,4 @@ protected: TString BuildStreamPath() const override; }; -class TTargetTransfer: public TTargetTableBase { -public: - struct TTransferConfig : public TConfigBase { - using TPtr = std::shared_ptr<TTransferConfig>; - - TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda); - - const TString& GetTransformLambda() const; - - private: - TString TransformLambda; - }; - - explicit TTargetTransfer(TReplication* replication, - ui64 id, const IConfig::TPtr& config); - - void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override; - - void Progress(const TActorContext& ctx) override; - void Shutdown(const TActorContext& ctx) override; - -protected: - TString BuildStreamPath() const override; - -private: - TActorId StreamConsumerRemover; -}; - } diff --git a/ydb/core/tx/replication/controller/target_transfer.cpp b/ydb/core/tx/replication/controller/target_transfer.cpp new file mode 100644 index 00000000000..8f8a392a21d --- /dev/null +++ b/ydb/core/tx/replication/controller/target_transfer.cpp @@ -0,0 +1,68 @@ +#include "stream_consumer_remover.h" +#include "target_transfer.h" + +#include <ydb/core/base/path.h> +#include <ydb/core/tx/replication/service/service.h> +#include <ydb/library/actors/core/events.h> + +namespace NKikimr::NReplication::NController { + +TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config) + : TTargetWithStream(replication, ETargetKind::Transfer, id, config) +{ +} + +void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) { + auto& t = cfg.GetTransferSpecific().GetTargets(0); + Config = std::make_shared<TTargetTransfer::TTransferConfig>( + GetConfig()->GetSrcPath(), + GetConfig()->GetDstPath(), + t.GetTransformLambda()); +} + +void TTargetTransfer::Progress(const TActorContext& ctx) { + auto replication = GetReplication(); + + switch (GetStreamState()) { + case EStreamState::Removing: + if (GetWorkers()) { + RemoveWorkers(ctx); + } else if (!StreamConsumerRemover) { + StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx)); + } + return; + case EStreamState::Creating: + case EStreamState::Ready: + case EStreamState::Removed: + case EStreamState::Error: + break; + } + + TTargetWithStream::Progress(ctx); +} + +void TTargetTransfer::Shutdown(const TActorContext& ctx) { + for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) { + if (auto actorId = std::exchange(*x, {})) { + ctx.Send(actorId, new TEvents::TEvPoison()); + } + } + + TTargetWithStream::Shutdown(ctx); +} + +TString TTargetTransfer::GetStreamPath() const { + return CanonizePath(GetSrcPath()); +} + +TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda) + : TConfigBase(ETargetKind::Transfer, srcPath, dstPath) + , TransformLambda(transformLambda) +{ +} + +const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const { + return TransformLambda; +} + +} diff --git a/ydb/core/tx/replication/controller/target_transfer.h b/ydb/core/tx/replication/controller/target_transfer.h new file mode 100644 index 00000000000..d45818ee8d0 --- /dev/null +++ b/ydb/core/tx/replication/controller/target_transfer.h @@ -0,0 +1,34 @@ +#pragma once + +#include "target_with_stream.h" + +namespace NKikimr::NReplication::NController { + +class TTargetTransfer: public TTargetWithStream { +public: + struct TTransferConfig : public TConfigBase { + using TPtr = std::shared_ptr<TTransferConfig>; + + TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda); + + const TString& GetTransformLambda() const; + + private: + TString TransformLambda; + }; + + explicit TTargetTransfer(TReplication* replication, + ui64 id, const IConfig::TPtr& config); + + void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override; + + void Progress(const TActorContext& ctx) override; + void Shutdown(const TActorContext& ctx) override; + + TString GetStreamPath() const override; + +private: + TActorId StreamConsumerRemover; +}; + +} diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp index 96145c6783b..20ee070b78d 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.cpp +++ b/ydb/core/tx/replication/controller/target_with_stream.cpp @@ -1,14 +1,116 @@ +#include "event_util.h" +#include "logging.h" #include "private_events.h" #include "stream_creator.h" #include "stream_remover.h" #include "target_with_stream.h" +#include "util.h" +#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h> +#include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/events.h> +#include <ydb/library/actors/core/hfunc.h> namespace NKikimr::NReplication::NController { const TString ReplicationConsumerName = "replicationConsumer"; +namespace { + +class TWorkerRegistar: public TActorBootstrapped<TWorkerRegistar> { + void Handle(TEvYdbProxy::TEvDescribeTopicResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + const auto& result = ev->Get()->Result; + if (!result.IsSuccess()) { + if (IsRetryableError(result)) { + LOG_W("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Retry."); + return Retry(); + } + + LOG_E("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Stop."); + return; // TODO: hard error + } + + for (const auto& partition : result.GetTopicDescription().GetPartitions()) { + if (!partition.GetParentPartitionIds().empty()) { + continue; + } + + auto ev = MakeRunWorkerEv( + ReplicationId, TargetId, Config, partition.GetPartitionId(), + ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId); + Send(Parent, std::move(ev)); + } + + PassAway(); + } + + void Retry() { + LOG_D("Retry"); + Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup()); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR; + } + + explicit TWorkerRegistar( + const TActorId& parent, + const TActorId& proxy, + const NKikimrReplication::TConnectionParams& connectionParams, + const NKikimrReplication::TConsistencySettings& consistencySettings, + ui64 rid, + ui64 tid, + const TString& srcStreamPath, + const TString& srcStreamConsumerName, + const TPathId& dstPathId, + const TReplication::ITarget::IConfig::TPtr& config) + : Parent(parent) + , YdbProxy(proxy) + , ConnectionParams(connectionParams) + , ConsistencySettings(consistencySettings) + , ReplicationId(rid) + , TargetId(tid) + , SrcStreamPath(srcStreamPath) + , SrcStreamConsumerName(srcStreamConsumerName) + , DstPathId(dstPathId) + , LogPrefix("TableWorkerRegistar", ReplicationId, TargetId) + , Config(config) + { + } + + void Bootstrap() { + Become(&TThis::StateWork); + Send(YdbProxy, new TEvYdbProxy::TEvDescribeTopicRequest(SrcStreamPath, {})); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvDescribeTopicResponse, Handle); + sFunc(TEvents::TEvWakeup, Bootstrap); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Parent; + const TActorId YdbProxy; + const NKikimrReplication::TConnectionParams ConnectionParams; + const NKikimrReplication::TConsistencySettings ConsistencySettings; + const ui64 ReplicationId; + const ui64 TargetId; + const TString SrcStreamPath; + const TString SrcStreamConsumerName; + const TPathId DstPathId; + const TActorLogPrefix LogPrefix; + const TReplication::ITarget::IConfig::TPtr Config; + +}; // TWorkerRegistar + +} // namespace + void TTargetWithStream::Progress(const TActorContext& ctx) { auto replication = GetReplication(); @@ -47,4 +149,12 @@ void TTargetWithStream::Shutdown(const TActorContext& ctx) { TTargetBase::Shutdown(ctx); } +IActor* TTargetWithStream::CreateWorkerRegistar(const TActorContext& ctx) const { + auto replication = GetReplication(); + const auto& config = replication->GetConfig(); + return new TWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(), + config.GetSrcConnectionParams(), config.GetConsistencySettings(), + replication->GetId(), GetId(), GetStreamPath(), GetStreamConsumerName(), GetDstPathId(), GetConfig()); +} + } diff --git a/ydb/core/tx/replication/controller/target_with_stream.h b/ydb/core/tx/replication/controller/target_with_stream.h index 06a72aada18..02998940efd 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.h +++ b/ydb/core/tx/replication/controller/target_with_stream.h @@ -18,6 +18,8 @@ public: void Progress(const TActorContext& ctx) override; void Shutdown(const TActorContext& ctx) override; + IActor* CreateWorkerRegistar(const TActorContext& ctx) const override; + private: bool NameAssignmentInProcess = false; TActorId StreamCreator; diff --git a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp index cd247e2b5dd..48d37a57b9f 100644 --- a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp +++ b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp @@ -1,6 +1,5 @@ #include "controller_impl.h" -#include "target_table.h" -#include "util.h" +#include "target_transfer.h" #include <util/string/join.h> diff --git a/ydb/core/tx/replication/controller/tx_init.cpp b/ydb/core/tx/replication/controller/tx_init.cpp index 749dd1834d0..c6403af6182 100644 --- a/ydb/core/tx/replication/controller/tx_init.cpp +++ b/ydb/core/tx/replication/controller/tx_init.cpp @@ -1,5 +1,6 @@ #include "controller_impl.h" #include "target_table.h" +#include "target_transfer.h" namespace NKikimr::NReplication::NController { diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index 758383bf875..6c436ef7abd 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -38,6 +38,7 @@ SRCS( target_base.cpp target_discoverer.cpp target_table.cpp + target_transfer.cpp target_with_stream.cpp tenant_resolver.cpp tx_assign_tx_id.cpp |