aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-10-19 17:44:43 +0300
committerilnaz <ilnaz@ydb.tech>2023-10-19 18:49:11 +0300
commitdaf7f8c2fc7abbc32673f9517f22d9714d79724c (patch)
tree1686ce7eccc9222c2d531dcda1a1cc6be2632b27
parent65074cfc8b34a30865a46e920360967c30694422 (diff)
downloadydb-daf7f8c2fc7abbc32673f9517f22d9714d79724c.tar.gz
Recreate gone cdc actors if version was not changed KIKIMR-19778
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp40
1 files changed, 30 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 67650b9b268..f103e3680d0 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -36,6 +36,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
/// Init
+ void Init() {
+ auto opts = TPartitionWriterOpts()
+ .WithCheckState(true)
+ .WithAutoRegister(true);
+ Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), {}, ShardId, PartitionId, {}, SourceId, opts));
+ Become(&TThis::StateInit);
+ }
+
STATEFN(StateInit) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPartitionWriter::TEvInitResult, Handle);
@@ -274,7 +282,13 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
void Disconnected() {
LOG_D("Disconnected");
- Leave();
+
+ if (CurrentStateFunc() != static_cast<TReceiveFunc>(&TThis::StateInit)) {
+ return Leave();
+ }
+
+ CloseWriter();
+ Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup());
}
void Lost() {
@@ -287,11 +301,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
PassAway();
}
- void PassAway() override {
- if (Writer) {
- Send(Writer, new TEvents::TEvPoisonPill());
+ void CloseWriter() {
+ if (const auto& writer = std::exchange(Writer, {})) {
+ Send(writer, new TEvents::TEvPoisonPill());
}
+ }
+ void PassAway() override {
+ CloseWriter();
TActorBootstrapped::PassAway();
}
@@ -316,17 +333,14 @@ public:
}
void Bootstrap() {
- auto opts = TPartitionWriterOpts()
- .WithCheckState(true)
- .WithAutoRegister(true);
- Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), {}, ShardId, PartitionId, {}, SourceId, opts));
- Become(&TThis::StateInit);
+ Init();
}
STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvPartitionWriter::TEvDisconnected, Disconnected);
hFunc(NMon::TEvRemoteHttpInfo, Handle);
+ sFunc(TEvents::TEvWakeup, Init);
sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -692,7 +706,11 @@ class TCdcChangeSenderMain
Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined());
}
- CreateSenders(MakePartitionIds(KeyDesc->Partitions));
+ const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
+ const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
+ TopicVersion = topicVersion;
+
+ CreateSenders(MakePartitionIds(KeyDesc->Partitions), versionChanged);
Become(&TThis::StateMain);
}
@@ -812,6 +830,7 @@ public:
explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId)
: TActorBootstrapped()
, TBaseChangeSender(this, this, dataShard, streamPathId)
+ , TopicVersion(0)
{
}
@@ -846,6 +865,7 @@ private:
TUserTable::TCdcStream Stream;
TPathId TopicPathId;
+ ui64 TopicVersion;
THolder<TKeyDesc> KeyDesc;
THashMap<ui32, ui64> PartitionToShard;