aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-10-02 21:42:12 +0300
committerGitHub <noreply@github.com>2024-10-02 18:42:12 +0000
commit91f8a44b4102ecbc5d62bfc87430117cc5719e89 (patch)
tree2a85b02ca7efa75df649fb449153e3e87ce5c903
parentea37d52d46be3780e17c129fe232aa8187d68272 (diff)
downloadydb-91f8a44b4102ecbc5d62bfc87430117cc5719e89.tar.gz
Better handling of connection loss (#9993)
-rw-r--r--ydb/core/change_exchange/change_sender.cpp18
-rw-r--r--ydb/core/change_exchange/change_sender.h4
-rw-r--r--ydb/core/change_exchange/util.cpp15
-rw-r--r--ydb/core/change_exchange/util.h9
-rw-r--r--ydb/core/change_exchange/ya.make1
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp32
-rw-r--r--ydb/core/tx/datashard/change_sender_table_base.h21
-rw-r--r--ydb/core/tx/replication/service/base_table_writer.cpp22
8 files changed, 71 insertions, 51 deletions
diff --git a/ydb/core/change_exchange/change_sender.cpp b/ydb/core/change_exchange/change_sender.cpp
index 78a8baf488..df2de3ebf8 100644
--- a/ydb/core/change_exchange/change_sender.cpp
+++ b/ydb/core/change_exchange/change_sender.cpp
@@ -61,20 +61,27 @@ void TChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) {
}
}
-void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged) {
- if (partitioningChanged) {
+void TChangeSender::CreateSendersImpl(const TVector<ui64>& partitionIds) {
+ if (partitionIds) {
CreateMissingSenders(partitionIds);
} else {
- RecreateSenders(GonePartitions);
+ RecreateSenders(std::exchange(GonePartitions, {}));
}
- GonePartitions.clear();
-
if (!Enqueued || !RequestRecords()) {
SendRecords();
}
}
+void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
+ Y_ABORT_UNLESS(partitionIds);
+ CreateSendersImpl(partitionIds);
+}
+
+void TChangeSender::CreateSenders() {
+ CreateSendersImpl({});
+}
+
void TChangeSender::KillSenders() {
for (const auto& [_, sender] : std::exchange(Senders, {})) {
if (sender.ActorId) {
@@ -303,6 +310,7 @@ void TChangeSender::OnGone(ui64 partitionId) {
if (it->second.Ready) {
--ReadySenders;
}
+
Senders.erase(it);
GonePartitions.push_back(partitionId);
diff --git a/ydb/core/change_exchange/change_sender.h b/ydb/core/change_exchange/change_sender.h
index fd8e1ad387..c010213bf6 100644
--- a/ydb/core/change_exchange/change_sender.h
+++ b/ydb/core/change_exchange/change_sender.h
@@ -113,6 +113,7 @@ class TChangeSender {
THashSet<ui64> CompletedPartitions;
};
+ void CreateSendersImpl(const TVector<ui64>& partitionIds);
void LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId);
void RegisterSender(ui64 partitionId);
void CreateMissingSenders(const TVector<ui64>& partitionIds);
@@ -150,7 +151,8 @@ protected:
return ChangeServer;
}
- void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true);
+ void CreateSenders(const TVector<ui64>& partitionIds); // creates senders after partitioning changes
+ void CreateSenders(); // creates senders after connection loss
void KillSenders();
void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records);
diff --git a/ydb/core/change_exchange/util.cpp b/ydb/core/change_exchange/util.cpp
new file mode 100644
index 0000000000..c4c0516e0e
--- /dev/null
+++ b/ydb/core/change_exchange/util.cpp
@@ -0,0 +1,15 @@
+#include "util.h"
+
+namespace NKikimr::NChangeExchange {
+
+TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
+ TVector<ui64> result(::Reserve(partitions.size()));
+
+ for (const auto& partition : partitions) {
+ result.push_back(partition.ShardId);
+ }
+
+ return result;
+}
+
+}
diff --git a/ydb/core/change_exchange/util.h b/ydb/core/change_exchange/util.h
new file mode 100644
index 0000000000..f8ba146fde
--- /dev/null
+++ b/ydb/core/change_exchange/util.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/core/scheme/scheme_tabledefs.h>
+
+namespace NKikimr::NChangeExchange {
+
+TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions);
+
+}
diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make
index 468932eef0..5b1e5e6fc6 100644
--- a/ydb/core/change_exchange/ya.make
+++ b/ydb/core/change_exchange/ya.make
@@ -6,6 +6,7 @@ SRCS(
change_sender.cpp
change_sender_monitoring.cpp
resolve_partition.cpp
+ util.cpp
)
GENERATE_ENUM_SERIALIZATION(change_record.h)
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 1153804095..59f352ecaa 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/change_exchange/change_sender.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
+#include <ydb/core/change_exchange/util.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
@@ -440,16 +441,6 @@ class TCdcChangeSenderMain
return false;
}
- static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) {
- TVector<ui64> result(Reserve(partitions.size()));
-
- for (const auto& partition : partitions) {
- result.push_back(partition.ShardId);
- }
-
- return result;
- }
-
/// ResolveCdcStream
void ResolveCdcStream() {
@@ -571,6 +562,14 @@ class TCdcChangeSenderMain
return;
}
+ const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
+ if (TopicVersion && TopicVersion == topicVersion) {
+ CreateSenders();
+ return Become(&TThis::StateMain);
+ }
+
+ TopicVersion = topicVersion;
+
const auto& pqDesc = entry.PQGroupInfo->Description;
const auto& pqConfig = pqDesc.GetPQTabletConfig();
@@ -579,12 +578,7 @@ class TCdcChangeSenderMain
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
}
- const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
- const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
- TopicVersion = topicVersion;
-
- auto topicAutoPartitioning = ::NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED != pqConfig.GetPartitionStrategy().GetPartitionStrategyType();
-
+ const bool topicAutoPartitioning = IsTopicAutoPartitioningEnabled(pqConfig.GetPartitionStrategy().GetPartitionStrategyType());
Y_ABORT_UNLESS(topicAutoPartitioning || entry.PQGroupInfo->Schema);
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
@@ -598,10 +592,14 @@ class TCdcChangeSenderMain
SetPartitionResolver(new TMd5PartitionResolver(KeyDesc->GetPartitions().size()));
}
- CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged);
+ CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning));
Become(&TThis::StateMain);
}
+ static bool IsTopicAutoPartitioningEnabled(NKikimrPQ::TPQTabletConfig::TPartitionStrategyType strategy) {
+ return strategy != NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
+ }
+
/// Main
STATEFN(StateMain) {
diff --git a/ydb/core/tx/datashard/change_sender_table_base.h b/ydb/core/tx/datashard/change_sender_table_base.h
index b20b334852..4756aa7964 100644
--- a/ydb/core/tx/datashard/change_sender_table_base.h
+++ b/ydb/core/tx/datashard/change_sender_table_base.h
@@ -2,6 +2,7 @@
#include "change_exchange_helpers.h"
+#include <ydb/core/change_exchange/util.h>
#include <ydb/core/tablet_flat/flat_row_state.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
@@ -151,6 +152,11 @@ private:
return;
}
+ if (AsDerived()->TargetTableVersion && AsDerived()->TargetTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
+ AsDerived()->CreateSenders();
+ return AsDerived()->Serve();
+ }
+
AsDerived()->TagMap.clear();
TVector<NScheme::TTypeInfo> keyColumnTypes;
@@ -181,7 +187,6 @@ private:
);
AsDerived()->SetPartitionResolver(CreateDefaultPartitionResolver(*AsDerived()->KeyDesc.Get()));
-
AsDerived()->NextState(TStateTag{});
}
};
@@ -245,24 +250,12 @@ private:
return AsDerived()->Retry();
}
- const bool versionChanged = !AsDerived()->TargetTableVersion || AsDerived()->TargetTableVersion != entry.GeneralVersion;
AsDerived()->TargetTableVersion = entry.GeneralVersion;
-
AsDerived()->KeyDesc = std::move(entry.KeyDescription);
- AsDerived()->CreateSenders(MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()), versionChanged);
+ AsDerived()->CreateSenders(NChangeExchange::MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()));
AsDerived()->NextState(TStateTag{});
}
-
- static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
- TVector<ui64> result(Reserve(partitions.size()));
-
- for (const auto& partition : partitions) {
- result.push_back(partition.ShardId); // partition = shard
- }
-
- return result;
- }
};
template <typename TDerived>
diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp
index bfdd3c8d4b..ce9a0875cc 100644
--- a/ydb/core/tx/replication/service/base_table_writer.cpp
+++ b/ydb/core/tx/replication/service/base_table_writer.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender.h>
+#include <ydb/core/change_exchange/util.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
@@ -268,16 +269,6 @@ class TLocalTableWriter
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected);
}
- static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
- TVector<ui64> result(::Reserve(partitions.size()));
-
- for (const auto& partition : partitions) {
- result.push_back(partition.ShardId);
- }
-
- return result;
- }
-
void Registered(TActorSystem*, const TActorId&) override {
ChangeServer = SelfId();
}
@@ -338,6 +329,12 @@ class TLocalTableWriter
return;
}
+ if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
+ Y_ABORT_UNLESS(Initialized);
+ Resolving = false;
+ return CreateSenders();
+ }
+
auto schema = MakeIntrusive<TLightweightSchema>();
if (entry.Self && entry.Self->Info.HasVersion()) {
schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion();
@@ -370,7 +367,6 @@ class TLocalTableWriter
);
TChangeSender::SetPartitionResolver(CreateResolverFn(*KeyDesc.Get()));
-
ResolveKeys();
}
@@ -408,11 +404,9 @@ class TLocalTableWriter
return LogWarnAndRetry("Empty partitions");
}
- const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion;
TableVersion = entry.GeneralVersion;
-
KeyDesc = std::move(entry.KeyDescription);
- CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
+ CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));
if (!Initialized) {
Send(Worker, new TEvWorker::TEvHandshake());