aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-20 16:59:25 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-20 16:59:25 +0300
commitcf756298a93c80ff6bbc8bdab91ea8ce74fab994 (patch)
treec315ce249c8d070ab9bca3feee4b790fb51116ad
parentc8b7f84d9c4a39d7f576931850f269c560906f2b (diff)
downloadydb-cf756298a93c80ff6bbc8bdab91ea8ce74fab994.tar.gz
Create consumer
-rw-r--r--ydb/core/tx/replication/controller/stream_creator.cpp64
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.cpp2
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.h2
3 files changed, 63 insertions, 5 deletions
diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp
index e98686c978f..85b17800d96 100644
--- a/ydb/core/tx/replication/controller/stream_creator.cpp
+++ b/ydb/core/tx/replication/controller/stream_creator.cpp
@@ -1,11 +1,13 @@
#include "logging.h"
#include "private_events.h"
#include "stream_creator.h"
+#include "target_with_stream.h"
#include "util.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <ydb/core/base/path.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
namespace NKikimr::NReplication::NController {
@@ -25,14 +27,15 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
break;
}
- Become(&TThis::StateWork);
+ Become(&TThis::StateCreateStream);
}
- STATEFN(StateWork) {
+ STATEFN(StateCreateStream) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle);
sFunc(TEvents::TEvWakeup, CreateStream);
- sFunc(TEvents::TEvPoison, PassAway);
+ default:
+ return StateBase(ev, TlsActivationContext->AsActorContext());
}
}
@@ -42,7 +45,48 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
if (!result.IsSuccess()) {
if (IsRetryableError(result)) {
- LOG_D("Retry");
+ LOG_D("Retry CreateStream");
+ return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
+ }
+
+ LOG_E("Error"
+ << ": status# " << result.GetStatus()
+ << ", issues# " << result.GetIssues().ToOneLineString());
+ return Reply(std::move(result));
+ } else {
+ LOG_I("Success"
+ << ": issues# " << result.GetIssues().ToOneLineString());
+ return CreateConsumer();
+ }
+ }
+
+ void CreateConsumer() {
+ const auto streamPath = CanonizePath(ChildPath(SplitPath(SrcPath), Changefeed.GetName()));
+ const auto settings = NYdb::NTopic::TAlterTopicSettings()
+ .BeginAddConsumer()
+ .ConsumerName(ReplicationConsumerName)
+ .EndAddConsumer();
+
+ Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(streamPath, settings));
+ Become(&TThis::StateCreateConsumer);
+ }
+
+ STATEFN(StateCreateConsumer) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvYdbProxy::TEvAlterTopicResponse, Handle);
+ sFunc(TEvents::TEvWakeup, CreateConsumer);
+ default:
+ return StateBase(ev, TlsActivationContext->AsActorContext());
+ }
+ }
+
+ void Handle(TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ auto& result = ev->Get()->Result;
+
+ if (!result.IsSuccess()) {
+ if (IsRetryableError(result)) {
+ LOG_D("Retry CreateConsumer");
return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
}
@@ -54,7 +98,11 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
<< ": issues# " << result.GetIssues().ToOneLineString());
}
- Send(Parent, new TEvPrivate::TEvCreateStreamResult(ReplicationId, TargetId, std::move(result)));
+ Reply(std::move(result));
+ }
+
+ void Reply(NYdb::TStatus&& status) {
+ Send(Parent, new TEvPrivate::TEvCreateStreamResult(ReplicationId, TargetId, std::move(status)));
PassAway();
}
@@ -86,6 +134,12 @@ public:
CreateStream();
}
+ STATEFN(StateBase) {
+ switch (ev->GetTypeRewrite()) {
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
private:
const TActorId Parent;
const TActorId YdbProxy;
diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp
index 63296311f83..6f1fbf15955 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.cpp
+++ b/ydb/core/tx/replication/controller/target_with_stream.cpp
@@ -7,6 +7,8 @@
namespace NKikimr::NReplication::NController {
+const TString ReplicationConsumerName = "replicationConsumer";
+
void TTargetWithStream::Progress(ui64 schemeShardId, const TActorId& proxy, const TActorContext& ctx) {
switch (GetStreamState()) {
case EStreamState::Creating:
diff --git a/ydb/core/tx/replication/controller/target_with_stream.h b/ydb/core/tx/replication/controller/target_with_stream.h
index 9058a56908d..8dd5bf590af 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.h
+++ b/ydb/core/tx/replication/controller/target_with_stream.h
@@ -4,6 +4,8 @@
namespace NKikimr::NReplication::NController {
+extern const TString ReplicationConsumerName;
+
class TTargetWithStream: public TTargetBase {
public:
template <typename... Args>