diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-02-08 14:17:51 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-02-08 14:17:51 +0300 |
commit | adf98907aa45c85b4c57dcd2b2e85c2c8ad21dbb (patch) | |
tree | 97bd4e71001ef86a26ada89ac826e9c20a873e5a | |
parent | 63721d79509d1f5bcffac38a85c0dfc4cb084893 (diff) | |
download | ydb-adf98907aa45c85b4c57dcd2b2e85c2c8ad21dbb.tar.gz |
Stream creator impl
6 files changed, 132 insertions, 26 deletions
diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index 456a12bf90..e98686c978 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -1,5 +1,7 @@ -#include "stream_creator.h" +#include "logging.h" #include "private_events.h" +#include "stream_creator.h" +#include "util.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -9,46 +11,97 @@ namespace NKikimr::NReplication::NController { class TStreamCreator: public TActorBootstrapped<TStreamCreator> { + static NYdb::NTable::TChangefeedDescription MakeChangefeed(const TString& name) { + using namespace NYdb::NTable; + return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json) + .WithInitialScan(); + } + + void CreateStream() { + switch (Kind) { + case TReplication::ETargetKind::Table: + Send(YdbProxy, new TEvYdbProxy::TEvAlterTableRequest(SrcPath, NYdb::NTable::TAlterTableSettings() + .AppendAddChangefeeds(Changefeed))); + break; + } + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle); + sFunc(TEvents::TEvWakeup, CreateStream); + sFunc(TEvents::TEvPoison, PassAway); + } + } + + void Handle(TEvYdbProxy::TEvAlterTableResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + auto& result = ev->Get()->Result; + + if (!result.IsSuccess()) { + if (IsRetryableError(result)) { + LOG_D("Retry"); + return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup); + } + + LOG_E("Error" + << ": status# " << result.GetStatus() + << ", issues# " << result.GetIssues().ToOneLineString()); + } else { + LOG_I("Success" + << ": issues# " << result.GetIssues().ToOneLineString()); + } + + Send(Parent, new TEvPrivate::TEvCreateStreamResult(ReplicationId, TargetId, std::move(result))); + PassAway(); + } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::REPLICATION_CONTROLLER_STREAM_CREATOR; } - explicit TStreamCreator(const TActorId& parent, ui64 rid, ui64 tid, const TActorId& proxy) + explicit TStreamCreator( + const TActorId& parent, + const TActorId& proxy, + ui64 rid, + ui64 tid, + TReplication::ETargetKind kind, + const TString& srcPath, + const TString& streamName) : Parent(parent) + , YdbProxy(proxy) , ReplicationId(rid) , TargetId(tid) - , YdbProxy(proxy) + , Kind(kind) + , SrcPath(srcPath) + , Changefeed(MakeChangefeed(streamName)) + , LogPrefix("StreamCreator", ReplicationId, TargetId) { - // TODO: remove it - Y_UNUSED(Parent); - Y_UNUSED(ReplicationId); - Y_UNUSED(TargetId); - Y_UNUSED(YdbProxy); } void Bootstrap() { - // TODO: send request - Become(&TThis::StateWork); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - // TODO: handle response - sFunc(TEvents::TEvPoison, PassAway); - } + CreateStream(); } private: const TActorId Parent; + const TActorId YdbProxy; const ui64 ReplicationId; const ui64 TargetId; - const TActorId YdbProxy; + const TReplication::ETargetKind Kind; + const TString SrcPath; + const NYdb::NTable::TChangefeedDescription Changefeed; + const TActorLogPrefix LogPrefix; }; // TStreamCreator -IActor* CreateStreamCreator(const TActorId& parent, ui64 rid, ui64 tid, const TActorId& proxy) { - return new TStreamCreator(parent, rid, tid, proxy); +IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, + TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName) +{ + return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, streamName); } } diff --git a/ydb/core/tx/replication/controller/stream_creator.h b/ydb/core/tx/replication/controller/stream_creator.h index c2e57983a7..655a0c205a 100644 --- a/ydb/core/tx/replication/controller/stream_creator.h +++ b/ydb/core/tx/replication/controller/stream_creator.h @@ -1,9 +1,12 @@ #pragma once +#include "replication.h" + #include <ydb/core/base/defs.h> namespace NKikimr::NReplication::NController { -IActor* CreateStreamCreator(const TActorId& parent, ui64 rid, ui64 tid, const TActorId& proxy); +IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, + TReplication::ETargetKind kind, const TString& srcPath, const TString& streamName); } diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index c06040345c..9fce0155f0 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -75,6 +75,10 @@ ui64 TTargetBase::GetTargetId() const { return TargetId; } +TReplication::ETargetKind TTargetBase::GetTargetKind() const { + return Kind; +} + void TTargetBase::Progress(ui64 schemeShardId, const TActorId& proxy, const TActorContext& ctx) { switch (DstState) { case EDstState::Creating: diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h index 35c8c872fc..37b9133200 100644 --- a/ydb/core/tx/replication/controller/target_base.h +++ b/ydb/core/tx/replication/controller/target_base.h @@ -37,6 +37,7 @@ public: protected: ui64 GetReplicationId() const; ui64 GetTargetId() const; + ETargetKind GetTargetKind() const; private: const ETargetKind Kind; diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp index 2cf09ed107..841e01581c 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.cpp +++ b/ydb/core/tx/replication/controller/target_with_stream.cpp @@ -13,7 +13,8 @@ void TTargetWithStream::Progress(ui64 schemeShardId, const TActorId& proxy, cons ctx.Send(ctx.SelfID, new TEvPrivate::TEvAssignStreamName(GetReplicationId(), GetTargetId())); NameAssignmentInProcess = true; } else if (!StreamCreator) { - StreamCreator = ctx.Register(CreateStreamCreator(ctx.SelfID, GetReplicationId(), GetTargetId(), proxy)); + StreamCreator = ctx.Register(CreateStreamCreator(ctx.SelfID, proxy, + GetReplicationId(), GetTargetId(), GetTargetKind(), GetSrcPath(), GetStreamName())); } return; case EStreamState::Removing: diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index 95ab198fc8..199cf2a359 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -8,8 +8,7 @@ #include <ydb/core/base/ticket_parser.h> -namespace NKikimr { -namespace NReplication { +namespace NKikimr::NReplication { Y_UNIT_TEST_SUITE(YdbProxyTests) { template <bool UseDatabase = true> @@ -465,7 +464,52 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { } } + Y_UNIT_TEST(CreateCdcStream) { + TEnv<false> env; + // create table + { + auto schema = NYdb::NTable::TTableBuilder() + .AddNullableColumn("key", NYdb::EPrimitiveType::Uint64) + .AddNullableColumn("value", NYdb::EPrimitiveType::Utf8) + .SetPrimaryKeyColumn("key") + .Build(); + + auto ev = env.Send<TEvYdbProxy::TEvCreateTableResponse>( + new TEvYdbProxy::TEvCreateTableRequest("/Root/table", std::move(schema), {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + + const auto feed = NYdb::NTable::TChangefeedDescription("updates", + NYdb::NTable::EChangefeedMode::Updates, NYdb::NTable::EChangefeedFormat::Json + ); + + // two attempts: create, check, retry, check + for (int i = 1; i <= 2; ++i) { + // create cdc stream + { + auto settings = NYdb::NTable::TAlterTableSettings() + .AppendAddChangefeeds(feed); + + auto ev = env.Send<TEvYdbProxy::TEvAlterTableResponse>( + new TEvYdbProxy::TEvAlterTableRequest("/Root/table", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // describe + { + auto ev = env.Send<TEvYdbProxy::TEvDescribeTableResponse>( + new TEvYdbProxy::TEvDescribeTableRequest("/Root/table", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + + const auto& schema = ev->Get()->Result.GetTableDescription(); + UNIT_ASSERT_EQUAL(schema.GetChangefeedDescriptions().size(), 1); + UNIT_ASSERT_EQUAL(schema.GetChangefeedDescriptions().at(0), feed); + } + } + } + } // YdbProxyTests -} // NReplication -} // NKikimr +} |