diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-10-19 17:44:43 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-10-19 18:49:11 +0300 |
commit | daf7f8c2fc7abbc32673f9517f22d9714d79724c (patch) | |
tree | 1686ce7eccc9222c2d531dcda1a1cc6be2632b27 | |
parent | 65074cfc8b34a30865a46e920360967c30694422 (diff) | |
download | ydb-daf7f8c2fc7abbc32673f9517f22d9714d79724c.tar.gz |
Recreate gone cdc actors if version was not changed KIKIMR-19778
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 40 |
1 files changed, 30 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 67650b9b268..f103e3680d0 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -36,6 +36,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti /// Init + void Init() { + auto opts = TPartitionWriterOpts() + .WithCheckState(true) + .WithAutoRegister(true); + Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), {}, ShardId, PartitionId, {}, SourceId, opts)); + Become(&TThis::StateInit); + } + STATEFN(StateInit) { switch (ev->GetTypeRewrite()) { hFunc(TEvPartitionWriter::TEvInitResult, Handle); @@ -274,7 +282,13 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti void Disconnected() { LOG_D("Disconnected"); - Leave(); + + if (CurrentStateFunc() != static_cast<TReceiveFunc>(&TThis::StateInit)) { + return Leave(); + } + + CloseWriter(); + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup()); } void Lost() { @@ -287,11 +301,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti PassAway(); } - void PassAway() override { - if (Writer) { - Send(Writer, new TEvents::TEvPoisonPill()); + void CloseWriter() { + if (const auto& writer = std::exchange(Writer, {})) { + Send(writer, new TEvents::TEvPoisonPill()); } + } + void PassAway() override { + CloseWriter(); TActorBootstrapped::PassAway(); } @@ -316,17 +333,14 @@ public: } void Bootstrap() { - auto opts = TPartitionWriterOpts() - .WithCheckState(true) - .WithAutoRegister(true); - Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), {}, ShardId, PartitionId, {}, SourceId, opts)); - Become(&TThis::StateInit); + Init(); } STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { sFunc(TEvPartitionWriter::TEvDisconnected, Disconnected); hFunc(NMon::TEvRemoteHttpInfo, Handle); + sFunc(TEvents::TEvWakeup, Init); sFunc(TEvents::TEvPoison, PassAway); } } @@ -692,7 +706,11 @@ class TCdcChangeSenderMain Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined()); } - CreateSenders(MakePartitionIds(KeyDesc->Partitions)); + const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); + const bool versionChanged = !TopicVersion || TopicVersion != topicVersion; + TopicVersion = topicVersion; + + CreateSenders(MakePartitionIds(KeyDesc->Partitions), versionChanged); Become(&TThis::StateMain); } @@ -812,6 +830,7 @@ public: explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId) : TActorBootstrapped() , TBaseChangeSender(this, this, dataShard, streamPathId) + , TopicVersion(0) { } @@ -846,6 +865,7 @@ private: TUserTable::TCdcStream Stream; TPathId TopicPathId; + ui64 TopicVersion; THolder<TKeyDesc> KeyDesc; THashMap<ui32, ui64> PartitionToShard; |