diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-20 16:59:25 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-20 16:59:25 +0300 |
commit | cf756298a93c80ff6bbc8bdab91ea8ce74fab994 (patch) | |
tree | c315ce249c8d070ab9bca3feee4b790fb51116ad | |
parent | c8b7f84d9c4a39d7f576931850f269c560906f2b (diff) | |
download | ydb-cf756298a93c80ff6bbc8bdab91ea8ce74fab994.tar.gz |
Create consumer
3 files changed, 63 insertions, 5 deletions
diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index e98686c978f..85b17800d96 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -1,11 +1,13 @@ #include "logging.h" #include "private_events.h" #include "stream_creator.h" +#include "target_with_stream.h" #include "util.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> +#include <ydb/core/base/path.h> #include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h> namespace NKikimr::NReplication::NController { @@ -25,14 +27,15 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> { break; } - Become(&TThis::StateWork); + Become(&TThis::StateCreateStream); } - STATEFN(StateWork) { + STATEFN(StateCreateStream) { switch (ev->GetTypeRewrite()) { hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle); sFunc(TEvents::TEvWakeup, CreateStream); - sFunc(TEvents::TEvPoison, PassAway); + default: + return StateBase(ev, TlsActivationContext->AsActorContext()); } } @@ -42,7 +45,48 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> { if (!result.IsSuccess()) { if (IsRetryableError(result)) { - LOG_D("Retry"); + LOG_D("Retry CreateStream"); + return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup); + } + + LOG_E("Error" + << ": status# " << result.GetStatus() + << ", issues# " << result.GetIssues().ToOneLineString()); + return Reply(std::move(result)); + } else { + LOG_I("Success" + << ": issues# " << result.GetIssues().ToOneLineString()); + return CreateConsumer(); + } + } + + void CreateConsumer() { + const auto streamPath = CanonizePath(ChildPath(SplitPath(SrcPath), Changefeed.GetName())); + const auto settings = NYdb::NTopic::TAlterTopicSettings() + .BeginAddConsumer() + .ConsumerName(ReplicationConsumerName) + .EndAddConsumer(); + + Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(streamPath, settings)); + Become(&TThis::StateCreateConsumer); + } + + STATEFN(StateCreateConsumer) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvAlterTopicResponse, Handle); + sFunc(TEvents::TEvWakeup, CreateConsumer); + default: + return StateBase(ev, TlsActivationContext->AsActorContext()); + } + } + + void Handle(TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + auto& result = ev->Get()->Result; + + if (!result.IsSuccess()) { + if (IsRetryableError(result)) { + LOG_D("Retry CreateConsumer"); return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup); } @@ -54,7 +98,11 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> { << ": issues# " << result.GetIssues().ToOneLineString()); } - Send(Parent, new TEvPrivate::TEvCreateStreamResult(ReplicationId, TargetId, std::move(result))); + Reply(std::move(result)); + } + + void Reply(NYdb::TStatus&& status) { + Send(Parent, new TEvPrivate::TEvCreateStreamResult(ReplicationId, TargetId, std::move(status))); PassAway(); } @@ -86,6 +134,12 @@ public: CreateStream(); } + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoison, PassAway); + } + } + private: const TActorId Parent; const TActorId YdbProxy; diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp index 63296311f83..6f1fbf15955 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.cpp +++ b/ydb/core/tx/replication/controller/target_with_stream.cpp @@ -7,6 +7,8 @@ namespace NKikimr::NReplication::NController { +const TString ReplicationConsumerName = "replicationConsumer"; + void TTargetWithStream::Progress(ui64 schemeShardId, const TActorId& proxy, const TActorContext& ctx) { switch (GetStreamState()) { case EStreamState::Creating: diff --git a/ydb/core/tx/replication/controller/target_with_stream.h b/ydb/core/tx/replication/controller/target_with_stream.h index 9058a56908d..8dd5bf590af 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.h +++ b/ydb/core/tx/replication/controller/target_with_stream.h @@ -4,6 +4,8 @@ namespace NKikimr::NReplication::NController { +extern const TString ReplicationConsumerName; + class TTargetWithStream: public TTargetBase { public: template <typename... Args> |