diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-06-20 14:15:06 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-06-20 14:15:06 +0300 |
commit | d72d3725bf30350da6b32682f936dc20f6b984eb (patch) | |
tree | 4534a985c9955bad52abe96dcad2bdac1d63fafa | |
parent | 5e86de883f882a9b9af25a9f211ae75e55db0702 (diff) | |
download | ydb-d72d3725bf30350da6b32682f936dc20f6b984eb.tar.gz |
Recreate gone actors if version was not changed
-rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/scheme_board/cache.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/scheme_cache/scheme_cache.h | 1 |
5 files changed, 38 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 99a25a8b59..09a63971a0 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -624,8 +624,11 @@ class TAsyncIndexChangeSenderMain return Retry(); } + const bool versionChanged = !IndexTableVersion || IndexTableVersion != entry.GeneralVersion; + IndexTableVersion = entry.GeneralVersion; + KeyDesc = std::move(entry.KeyDescription); - CreateSenders(MakePartitionIds(KeyDesc->GetPartitions())); + CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged); Become(&TThis::StateMain); } @@ -723,6 +726,7 @@ public: : TActorBootstrapped() , TBaseChangeSender(this, this, dataShard, indexPathId) , UserTableId(userTableId) + , IndexTableVersion(0) { } @@ -751,6 +755,7 @@ private: TMap<TTag, TTag> TagMap; // from main to index TPathId IndexTablePathId; + ui64 IndexTableVersion; THolder<TKeyDesc> KeyDesc; }; // TAsyncIndexChangeSenderMain diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index 68094c6dad..4d8ab339b8 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NDataShard { -void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) { +void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) { THashMap<ui64, TSender> senders; for (const auto& partitionId : partitionIds) { @@ -32,6 +32,24 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) { } Senders = std::move(senders); +} + +void TBaseChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) { + for (const auto& partitionId : partitionIds) { + Y_VERIFY(!Senders.contains(partitionId)); + auto& sender = Senders[partitionId]; + sender.ActorId = ActorOps->Register(CreateSender(partitionId)); + } +} + +void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged) { + if (partitioningChanged) { + CreateMissingSenders(partitionIds); + } else { + RecreateSenders(GonePartitions); + } + + GonePartitions.clear(); if (!Enqueued || !RequestRecords()) { SendRecords(); @@ -199,6 +217,7 @@ void TBaseChangeSender::OnGone(ui64 partitionId) { } Senders.erase(it); + GonePartitions.push_back(partitionId); if (Resolver->IsResolving()) { return; diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index a5de292ecd..26f5f6efed 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -57,7 +57,7 @@ class IChangeSender { public: virtual ~IChangeSender() = default; - virtual void CreateSenders(const TVector<ui64>& partitionIds) = 0; + virtual void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) = 0; virtual void KillSenders() = 0; virtual IActor* CreateSender(ui64 partitionId) = 0; virtual void RemoveRecords() = 0; @@ -89,6 +89,9 @@ class TBaseChangeSender: public IChangeSender { TVector<TEnqueuedRecord> Pending; }; + void CreateMissingSenders(const TVector<ui64>& partitionIds); + void RecreateSenders(const TVector<ui64>& partitionIds); + bool RequestRecords(); void SendRecords(); @@ -103,7 +106,7 @@ protected: ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } - void CreateSenders(const TVector<ui64>& partitionIds) override; + void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override; void KillSenders() override; void RemoveRecords() override; @@ -135,6 +138,8 @@ private: TSet<TRequestedRecord> PendingBody; TMap<ui64, TChangeRecord> PendingSent; // ui64 is order + TVector<ui64> GonePartitions; + }; // TBaseChangeSender struct TSchemeCacheHelpers { diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 3080f5058a..9154fa6403 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -1930,6 +1930,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { entry.Kind = TableKind; entry.DomainInfo = DomainInfo; + if (Self) { + entry.GeneralVersion = Self->Info.GetVersion().GetGeneralVersion(); + } + if (!CheckColumns(context, entry, KeyColumnTypes, Columns)) { return; } diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index 094a8d1c01..e799a91e29 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -348,6 +348,7 @@ struct TSchemeCacheRequest { EStatus Status = EStatus::Unknown; EKind Kind = EKind::KindUnknown; TIntrusivePtr<TDomainInfo> DomainInfo; + ui64 GeneralVersion = 0; explicit TEntry(THolder<TKeyDesc> keyDesc) : KeyDescription(std::move(keyDesc)) |