diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-10-02 21:42:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-02 18:42:12 +0000 |
commit | 91f8a44b4102ecbc5d62bfc87430117cc5719e89 (patch) | |
tree | 2a85b02ca7efa75df649fb449153e3e87ce5c903 | |
parent | ea37d52d46be3780e17c129fe232aa8187d68272 (diff) | |
download | ydb-91f8a44b4102ecbc5d62bfc87430117cc5719e89.tar.gz |
Better handling of connection loss (#9993)
-rw-r--r-- | ydb/core/change_exchange/change_sender.cpp | 18 | ||||
-rw-r--r-- | ydb/core/change_exchange/change_sender.h | 4 | ||||
-rw-r--r-- | ydb/core/change_exchange/util.cpp | 15 | ||||
-rw-r--r-- | ydb/core/change_exchange/util.h | 9 | ||||
-rw-r--r-- | ydb/core/change_exchange/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 32 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_table_base.h | 21 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/base_table_writer.cpp | 22 |
8 files changed, 71 insertions, 51 deletions
diff --git a/ydb/core/change_exchange/change_sender.cpp b/ydb/core/change_exchange/change_sender.cpp index 78a8baf488..df2de3ebf8 100644 --- a/ydb/core/change_exchange/change_sender.cpp +++ b/ydb/core/change_exchange/change_sender.cpp @@ -61,20 +61,27 @@ void TChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) { } } -void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged) { - if (partitioningChanged) { +void TChangeSender::CreateSendersImpl(const TVector<ui64>& partitionIds) { + if (partitionIds) { CreateMissingSenders(partitionIds); } else { - RecreateSenders(GonePartitions); + RecreateSenders(std::exchange(GonePartitions, {})); } - GonePartitions.clear(); - if (!Enqueued || !RequestRecords()) { SendRecords(); } } +void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds) { + Y_ABORT_UNLESS(partitionIds); + CreateSendersImpl(partitionIds); +} + +void TChangeSender::CreateSenders() { + CreateSendersImpl({}); +} + void TChangeSender::KillSenders() { for (const auto& [_, sender] : std::exchange(Senders, {})) { if (sender.ActorId) { @@ -303,6 +310,7 @@ void TChangeSender::OnGone(ui64 partitionId) { if (it->second.Ready) { --ReadySenders; } + Senders.erase(it); GonePartitions.push_back(partitionId); diff --git a/ydb/core/change_exchange/change_sender.h b/ydb/core/change_exchange/change_sender.h index fd8e1ad387..c010213bf6 100644 --- a/ydb/core/change_exchange/change_sender.h +++ b/ydb/core/change_exchange/change_sender.h @@ -113,6 +113,7 @@ class TChangeSender { THashSet<ui64> CompletedPartitions; }; + void CreateSendersImpl(const TVector<ui64>& partitionIds); void LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId); void RegisterSender(ui64 partitionId); void CreateMissingSenders(const TVector<ui64>& partitionIds); @@ -150,7 +151,8 @@ protected: return ChangeServer; } - void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true); + void CreateSenders(const TVector<ui64>& partitionIds); // creates senders after partitioning changes + void CreateSenders(); // creates senders after connection loss void KillSenders(); void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records); diff --git a/ydb/core/change_exchange/util.cpp b/ydb/core/change_exchange/util.cpp new file mode 100644 index 0000000000..c4c0516e0e --- /dev/null +++ b/ydb/core/change_exchange/util.cpp @@ -0,0 +1,15 @@ +#include "util.h" + +namespace NKikimr::NChangeExchange { + +TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) { + TVector<ui64> result(::Reserve(partitions.size())); + + for (const auto& partition : partitions) { + result.push_back(partition.ShardId); + } + + return result; +} + +} diff --git a/ydb/core/change_exchange/util.h b/ydb/core/change_exchange/util.h new file mode 100644 index 0000000000..f8ba146fde --- /dev/null +++ b/ydb/core/change_exchange/util.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/core/scheme/scheme_tabledefs.h> + +namespace NKikimr::NChangeExchange { + +TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions); + +} diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make index 468932eef0..5b1e5e6fc6 100644 --- a/ydb/core/change_exchange/ya.make +++ b/ydb/core/change_exchange/ya.make @@ -6,6 +6,7 @@ SRCS( change_sender.cpp change_sender_monitoring.cpp resolve_partition.cpp + util.cpp ) GENERATE_ENUM_SERIALIZATION(change_record.h) diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 1153804095..59f352ecaa 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -6,6 +6,7 @@ #include <ydb/core/change_exchange/change_sender.h> #include <ydb/core/change_exchange/change_sender_monitoring.h> +#include <ydb/core/change_exchange/util.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/tx/scheme_cache/helpers.h> @@ -440,16 +441,6 @@ class TCdcChangeSenderMain return false; } - static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) { - TVector<ui64> result(Reserve(partitions.size())); - - for (const auto& partition : partitions) { - result.push_back(partition.ShardId); - } - - return result; - } - /// ResolveCdcStream void ResolveCdcStream() { @@ -571,6 +562,14 @@ class TCdcChangeSenderMain return; } + const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); + if (TopicVersion && TopicVersion == topicVersion) { + CreateSenders(); + return Become(&TThis::StateMain); + } + + TopicVersion = topicVersion; + const auto& pqDesc = entry.PQGroupInfo->Description; const auto& pqConfig = pqDesc.GetPQTabletConfig(); @@ -579,12 +578,7 @@ class TCdcChangeSenderMain PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId()); } - const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); - const bool versionChanged = !TopicVersion || TopicVersion != topicVersion; - TopicVersion = topicVersion; - - auto topicAutoPartitioning = ::NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED != pqConfig.GetPartitionStrategy().GetPartitionStrategyType(); - + const bool topicAutoPartitioning = IsTopicAutoPartitioningEnabled(pqConfig.GetPartitionStrategy().GetPartitionStrategyType()); Y_ABORT_UNLESS(topicAutoPartitioning || entry.PQGroupInfo->Schema); KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema); Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning); @@ -598,10 +592,14 @@ class TCdcChangeSenderMain SetPartitionResolver(new TMd5PartitionResolver(KeyDesc->GetPartitions().size())); } - CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged); + CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning)); Become(&TThis::StateMain); } + static bool IsTopicAutoPartitioningEnabled(NKikimrPQ::TPQTabletConfig::TPartitionStrategyType strategy) { + return strategy != NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED; + } + /// Main STATEFN(StateMain) { diff --git a/ydb/core/tx/datashard/change_sender_table_base.h b/ydb/core/tx/datashard/change_sender_table_base.h index b20b334852..4756aa7964 100644 --- a/ydb/core/tx/datashard/change_sender_table_base.h +++ b/ydb/core/tx/datashard/change_sender_table_base.h @@ -2,6 +2,7 @@ #include "change_exchange_helpers.h" +#include <ydb/core/change_exchange/util.h> #include <ydb/core/tablet_flat/flat_row_state.h> #include <ydb/core/tx/scheme_cache/helpers.h> @@ -151,6 +152,11 @@ private: return; } + if (AsDerived()->TargetTableVersion && AsDerived()->TargetTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) { + AsDerived()->CreateSenders(); + return AsDerived()->Serve(); + } + AsDerived()->TagMap.clear(); TVector<NScheme::TTypeInfo> keyColumnTypes; @@ -181,7 +187,6 @@ private: ); AsDerived()->SetPartitionResolver(CreateDefaultPartitionResolver(*AsDerived()->KeyDesc.Get())); - AsDerived()->NextState(TStateTag{}); } }; @@ -245,24 +250,12 @@ private: return AsDerived()->Retry(); } - const bool versionChanged = !AsDerived()->TargetTableVersion || AsDerived()->TargetTableVersion != entry.GeneralVersion; AsDerived()->TargetTableVersion = entry.GeneralVersion; - AsDerived()->KeyDesc = std::move(entry.KeyDescription); - AsDerived()->CreateSenders(MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()), versionChanged); + AsDerived()->CreateSenders(NChangeExchange::MakePartitionIds(AsDerived()->KeyDesc->GetPartitions())); AsDerived()->NextState(TStateTag{}); } - - static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) { - TVector<ui64> result(Reserve(partitions.size())); - - for (const auto& partition : partitions) { - result.push_back(partition.ShardId); // partition = shard - } - - return result; - } }; template <typename TDerived> diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp index bfdd3c8d4b..ce9a0875cc 100644 --- a/ydb/core/tx/replication/service/base_table_writer.cpp +++ b/ydb/core/tx/replication/service/base_table_writer.cpp @@ -4,6 +4,7 @@ #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/change_exchange/change_sender.h> +#include <ydb/core/change_exchange/util.h> #include <ydb/core/tablet_flat/flat_row_eggs.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/scheme_cache/helpers.h> @@ -268,16 +269,6 @@ class TLocalTableWriter return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected); } - static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) { - TVector<ui64> result(::Reserve(partitions.size())); - - for (const auto& partition : partitions) { - result.push_back(partition.ShardId); - } - - return result; - } - void Registered(TActorSystem*, const TActorId&) override { ChangeServer = SelfId(); } @@ -338,6 +329,12 @@ class TLocalTableWriter return; } + if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) { + Y_ABORT_UNLESS(Initialized); + Resolving = false; + return CreateSenders(); + } + auto schema = MakeIntrusive<TLightweightSchema>(); if (entry.Self && entry.Self->Info.HasVersion()) { schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion(); @@ -370,7 +367,6 @@ class TLocalTableWriter ); TChangeSender::SetPartitionResolver(CreateResolverFn(*KeyDesc.Get())); - ResolveKeys(); } @@ -408,11 +404,9 @@ class TLocalTableWriter return LogWarnAndRetry("Empty partitions"); } - const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion; TableVersion = entry.GeneralVersion; - KeyDesc = std::move(entry.KeyDescription); - CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged); + CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions())); if (!Initialized) { Send(Worker, new TEvWorker::TEvHandshake()); |