aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-02-08 14:17:51 +0300
committerilnaz <ilnaz@ydb.tech>2023-02-08 14:17:51 +0300
commitadf98907aa45c85b4c57dcd2b2e85c2c8ad21dbb (patch)
tree97bd4e71001ef86a26ada89ac826e9c20a873e5a
parent63721d79509d1f5bcffac38a85c0dfc4cb084893 (diff)
downloadydb-adf98907aa45c85b4c57dcd2b2e85c2c8ad21dbb.tar.gz
Stream creator impl
-rw-r--r--ydb/core/tx/replication/controller/stream_creator.cpp93
-rw-r--r--ydb/core/tx/replication/controller/stream_creator.h5
-rw-r--r--ydb/core/tx/replication/controller/target_base.cpp4
-rw-r--r--ydb/core/tx/replication/controller/target_base.h1
-rw-r--r--ydb/core/tx/replication/controller/target_with_stream.cpp3
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp52
6 files changed, 132 insertions, 26 deletions
diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp
index 456a12bf90..e98686c978 100644
--- a/ydb/core/tx/replication/controller/stream_creator.cpp
+++ b/ydb/core/tx/replication/controller/stream_creator.cpp
@@ -1,5 +1,7 @@
-#include "stream_creator.h"
+#include "logging.h"
#include "private_events.h"
+#include "stream_creator.h"
+#include "util.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -9,46 +11,97 @@
namespace NKikimr::NReplication::NController {
class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
+ static NYdb::NTable::TChangefeedDescription MakeChangefeed(const TString& name) {
+ using namespace NYdb::NTable;
+ return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
+ .WithInitialScan();
+ }
+
+ void CreateStream() {
+ switch (Kind) {
+ case TReplication::ETargetKind::Table:
+ Send(YdbProxy, new TEvYdbProxy::TEvAlterTableRequest(SrcPath, NYdb::NTable::TAlterTableSettings()
+ .AppendAddChangefeeds(Changefeed)));
+ break;
+ }
+
+ Become(&TThis::StateWork);
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle);
+ sFunc(TEvents::TEvWakeup, CreateStream);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+ void Handle(TEvYdbProxy::TEvAlterTableResponse::TPtr& ev) {
+ LOG_T("Handle " << ev->Get()->ToString());
+ auto& result = ev->Get()->Result;
+
+ if (!result.IsSuccess()) {
+ if (IsRetryableError(result)) {
+ LOG_D("Retry");
+ return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
+ }
+
+ LOG_E("Error"
+ << ": status# " << result.GetStatus()
+ << ", issues# " << result.GetIssues().ToOneLineString());
+ } else {
+ LOG_I("Success"
+ << ": issues# " << result.GetIssues().ToOneLineString());
+ }
+
+ Send(Parent, new TEvPrivate::TEvCreateStreamResult(ReplicationId, TargetId, std::move(result)));
+ PassAway();
+ }
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_STREAM_CREATOR;
}
- explicit TStreamCreator(const TActorId& parent, ui64 rid, ui64 tid, const TActorId& proxy)
+ explicit TStreamCreator(
+ const TActorId& parent,
+ const TActorId& proxy,
+ ui64 rid,
+ ui64 tid,
+ TReplication::ETargetKind kind,
+ const TString& srcPath,
+ const TString& streamName)
: Parent(parent)
+ , YdbProxy(proxy)
, ReplicationId(rid)
, TargetId(tid)
- , YdbProxy(proxy)
+ , Kind(kind)
+ , SrcPath(srcPath)
+ , Changefeed(MakeChangefeed(streamName))
+ , LogPrefix("StreamCreator", ReplicationId, TargetId)
{
- // TODO: remove it
- Y_UNUSED(Parent);
- Y_UNUSED(ReplicationId);
- Y_UNUSED(TargetId);
- Y_UNUSED(YdbProxy);
}
void Bootstrap() {
- // TODO: send request
- Become(&TThis::StateWork);
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- // TODO: handle response
- sFunc(TEvents::TEvPoison, PassAway);
- }
+ CreateStream();
}
private:
const TActorId Parent;
+ const TActorId YdbProxy;
const ui64 ReplicationId;
const ui64 TargetId;
- const TActorId YdbProxy;
+ const TReplication::ETargetKind Kind;
+ const TString SrcPath;
+ const NYdb::NTable::TChangefeedDescription Changefeed;
+ const TActorLogPrefix LogPrefix;
}; // TStreamCreator
-IActor* CreateStreamCreator(const TActorId& parent, ui64 rid, ui64 tid, const TActorId& proxy) {
- return new TStreamCreator(parent, rid, tid, proxy);
+IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
+ TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName)
+{
+ return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, streamName);
}
}
diff --git a/ydb/core/tx/replication/controller/stream_creator.h b/ydb/core/tx/replication/controller/stream_creator.h
index c2e57983a7..655a0c205a 100644
--- a/ydb/core/tx/replication/controller/stream_creator.h
+++ b/ydb/core/tx/replication/controller/stream_creator.h
@@ -1,9 +1,12 @@
#pragma once
+#include "replication.h"
+
#include <ydb/core/base/defs.h>
namespace NKikimr::NReplication::NController {
-IActor* CreateStreamCreator(const TActorId& parent, ui64 rid, ui64 tid, const TActorId& proxy);
+IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
+ TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName);
}
diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp
index c06040345c..9fce0155f0 100644
--- a/ydb/core/tx/replication/controller/target_base.cpp
+++ b/ydb/core/tx/replication/controller/target_base.cpp
@@ -75,6 +75,10 @@ ui64 TTargetBase::GetTargetId() const {
return TargetId;
}
+TReplication::ETargetKind TTargetBase::GetTargetKind() const {
+ return Kind;
+}
+
void TTargetBase::Progress(ui64 schemeShardId, const TActorId& proxy, const TActorContext& ctx) {
switch (DstState) {
case EDstState::Creating:
diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h
index 35c8c872fc..37b9133200 100644
--- a/ydb/core/tx/replication/controller/target_base.h
+++ b/ydb/core/tx/replication/controller/target_base.h
@@ -37,6 +37,7 @@ public:
protected:
ui64 GetReplicationId() const;
ui64 GetTargetId() const;
+ ETargetKind GetTargetKind() const;
private:
const ETargetKind Kind;
diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp
index 2cf09ed107..841e01581c 100644
--- a/ydb/core/tx/replication/controller/target_with_stream.cpp
+++ b/ydb/core/tx/replication/controller/target_with_stream.cpp
@@ -13,7 +13,8 @@ void TTargetWithStream::Progress(ui64 schemeShardId, const TActorId& proxy, cons
ctx.Send(ctx.SelfID, new TEvPrivate::TEvAssignStreamName(GetReplicationId(), GetTargetId()));
NameAssignmentInProcess = true;
} else if (!StreamCreator) {
- StreamCreator = ctx.Register(CreateStreamCreator(ctx.SelfID, GetReplicationId(), GetTargetId(), proxy));
+ StreamCreator = ctx.Register(CreateStreamCreator(ctx.SelfID, proxy,
+ GetReplicationId(), GetTargetId(), GetTargetKind(), GetSrcPath(), GetStreamName()));
}
return;
case EStreamState::Removing:
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
index 95ab198fc8..199cf2a359 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
@@ -8,8 +8,7 @@
#include <ydb/core/base/ticket_parser.h>
-namespace NKikimr {
-namespace NReplication {
+namespace NKikimr::NReplication {
Y_UNIT_TEST_SUITE(YdbProxyTests) {
template <bool UseDatabase = true>
@@ -465,7 +464,52 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
}
}
+ Y_UNIT_TEST(CreateCdcStream) {
+ TEnv<false> env;
+ // create table
+ {
+ auto schema = NYdb::NTable::TTableBuilder()
+ .AddNullableColumn("key", NYdb::EPrimitiveType::Uint64)
+ .AddNullableColumn("value", NYdb::EPrimitiveType::Utf8)
+ .SetPrimaryKeyColumn("key")
+ .Build();
+
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTableResponse>(
+ new TEvYdbProxy::TEvCreateTableRequest("/Root/table", std::move(schema), {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+
+ const auto feed = NYdb::NTable::TChangefeedDescription("updates",
+ NYdb::NTable::EChangefeedMode::Updates, NYdb::NTable::EChangefeedFormat::Json
+ );
+
+ // two attempts: create, check, retry, check
+ for (int i = 1; i <= 2; ++i) {
+ // create cdc stream
+ {
+ auto settings = NYdb::NTable::TAlterTableSettings()
+ .AppendAddChangefeeds(feed);
+
+ auto ev = env.Send<TEvYdbProxy::TEvAlterTableResponse>(
+ new TEvYdbProxy::TEvAlterTableRequest("/Root/table", settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // describe
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDescribeTableResponse>(
+ new TEvYdbProxy::TEvDescribeTableRequest("/Root/table", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+
+ const auto& schema = ev->Get()->Result.GetTableDescription();
+ UNIT_ASSERT_EQUAL(schema.GetChangefeedDescriptions().size(), 1);
+ UNIT_ASSERT_EQUAL(schema.GetChangefeedDescriptions().at(0), feed);
+ }
+ }
+ }
+
} // YdbProxyTests
-} // NReplication
-} // NKikimr
+}