aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-06-20 14:15:06 +0300
committerilnaz <ilnaz@ydb.tech>2023-06-20 14:15:06 +0300
commitd72d3725bf30350da6b32682f936dc20f6b984eb (patch)
tree4534a985c9955bad52abe96dcad2bdac1d63fafa
parent5e86de883f882a9b9af25a9f211ae75e55db0702 (diff)
downloadydb-d72d3725bf30350da6b32682f936dc20f6b984eb.tar.gz
Recreate gone actors if version was not changed
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp7
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp21
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h9
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp4
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.h1
5 files changed, 38 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 99a25a8b59..09a63971a0 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -624,8 +624,11 @@ class TAsyncIndexChangeSenderMain
return Retry();
}
+ const bool versionChanged = !IndexTableVersion || IndexTableVersion != entry.GeneralVersion;
+ IndexTableVersion = entry.GeneralVersion;
+
KeyDesc = std::move(entry.KeyDescription);
- CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()));
+ CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
Become(&TThis::StateMain);
}
@@ -723,6 +726,7 @@ public:
: TActorBootstrapped()
, TBaseChangeSender(this, this, dataShard, indexPathId)
, UserTableId(userTableId)
+ , IndexTableVersion(0)
{
}
@@ -751,6 +755,7 @@ private:
TMap<TTag, TTag> TagMap; // from main to index
TPathId IndexTablePathId;
+ ui64 IndexTableVersion;
THolder<TKeyDesc> KeyDesc;
}; // TAsyncIndexChangeSenderMain
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index 68094c6dad..4d8ab339b8 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -8,7 +8,7 @@
namespace NKikimr::NDataShard {
-void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
+void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) {
THashMap<ui64, TSender> senders;
for (const auto& partitionId : partitionIds) {
@@ -32,6 +32,24 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
}
Senders = std::move(senders);
+}
+
+void TBaseChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) {
+ for (const auto& partitionId : partitionIds) {
+ Y_VERIFY(!Senders.contains(partitionId));
+ auto& sender = Senders[partitionId];
+ sender.ActorId = ActorOps->Register(CreateSender(partitionId));
+ }
+}
+
+void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged) {
+ if (partitioningChanged) {
+ CreateMissingSenders(partitionIds);
+ } else {
+ RecreateSenders(GonePartitions);
+ }
+
+ GonePartitions.clear();
if (!Enqueued || !RequestRecords()) {
SendRecords();
@@ -199,6 +217,7 @@ void TBaseChangeSender::OnGone(ui64 partitionId) {
}
Senders.erase(it);
+ GonePartitions.push_back(partitionId);
if (Resolver->IsResolving()) {
return;
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index a5de292ecd..26f5f6efed 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -57,7 +57,7 @@ class IChangeSender {
public:
virtual ~IChangeSender() = default;
- virtual void CreateSenders(const TVector<ui64>& partitionIds) = 0;
+ virtual void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) = 0;
virtual void KillSenders() = 0;
virtual IActor* CreateSender(ui64 partitionId) = 0;
virtual void RemoveRecords() = 0;
@@ -89,6 +89,9 @@ class TBaseChangeSender: public IChangeSender {
TVector<TEnqueuedRecord> Pending;
};
+ void CreateMissingSenders(const TVector<ui64>& partitionIds);
+ void RecreateSenders(const TVector<ui64>& partitionIds);
+
bool RequestRecords();
void SendRecords();
@@ -103,7 +106,7 @@ protected:
ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}
- void CreateSenders(const TVector<ui64>& partitionIds) override;
+ void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
void KillSenders() override;
void RemoveRecords() override;
@@ -135,6 +138,8 @@ private:
TSet<TRequestedRecord> PendingBody;
TMap<ui64, TChangeRecord> PendingSent; // ui64 is order
+ TVector<ui64> GonePartitions;
+
}; // TBaseChangeSender
struct TSchemeCacheHelpers {
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 3080f5058a..9154fa6403 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -1930,6 +1930,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
entry.Kind = TableKind;
entry.DomainInfo = DomainInfo;
+ if (Self) {
+ entry.GeneralVersion = Self->Info.GetVersion().GetGeneralVersion();
+ }
+
if (!CheckColumns(context, entry, KeyColumnTypes, Columns)) {
return;
}
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h
index 094a8d1c01..e799a91e29 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.h
+++ b/ydb/core/tx/scheme_cache/scheme_cache.h
@@ -348,6 +348,7 @@ struct TSchemeCacheRequest {
EStatus Status = EStatus::Unknown;
EKind Kind = EKind::KindUnknown;
TIntrusivePtr<TDomainInfo> DomainInfo;
+ ui64 GeneralVersion = 0;
explicit TEntry(THolder<TKeyDesc> keyDesc)
: KeyDescription(std::move(keyDesc))