diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-06-27 19:01:30 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-06-27 19:01:30 +0300 |
commit | 783d4fe2d9dd78c8202e673ffa89632c2087c6fd (patch) | |
tree | 6247f041ddf20c9349b855fb613b9d4777f548f5 | |
parent | cd23cf14ba0e02916da11dafb03202741f855ab3 (diff) | |
download | ydb-783d4fe2d9dd78c8202e673ffa89632c2087c6fd.tar.gz |
(refactoring) Prepared change records
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 82 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 4 |
2 files changed, 52 insertions, 34 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index 4d8ab339b81..f96c6da0b94 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -24,10 +24,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) } for (const auto& [_, sender] : Senders) { - if (sender.Pending) { - Enqueued.insert(sender.Pending.begin(), sender.Pending.end()); - } - + ReEnqueueRecords(sender); ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); } @@ -130,7 +127,7 @@ void TBaseChangeSender::SendRecords() { } auto it = PendingSent.begin(); - THashMap<ui64, TVector<TChangeRecord>> forward; + THashSet<ui64> sendTo; bool needToResolve = false; while (it != PendingSent.end()) { @@ -141,31 +138,17 @@ void TBaseChangeSender::SendRecords() { continue; } - const auto& sender = Senders.at(partitionId); - if (!sender.Ready) { - ++it; - continue; + auto& sender = Senders.at(partitionId); + sender.Prepared.push_back(std::move(it->second)); + if (sender.Ready) { + sendTo.insert(partitionId); } - MemUsage -= it->second.GetBody().size(); - - forward[partitionId].push_back(std::move(it->second)); it = PendingSent.erase(it); } - for (auto& [partitionId, records] : forward) { - Y_VERIFY(Senders.contains(partitionId)); - auto& sender = Senders.at(partitionId); - - Y_VERIFY(sender.Ready); - sender.Ready = false; - - sender.Pending.reserve(records.size()); - for (const auto& record : records) { - sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size()); - } - - ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::move(records))); + for (const auto partitionId : sendTo) { + SendPreparedRecords(partitionId); } if (needToResolve && !Resolver->IsResolving()) { @@ -202,7 +185,9 @@ void TBaseChangeSender::OnReady(ui64 partitionId) { RemoveRecords(std::exchange(sender.Pending, {})); } - SendRecords(); + if (sender.Prepared) { + SendPreparedRecords(partitionId); + } } void TBaseChangeSender::OnGone(ui64 partitionId) { @@ -211,11 +196,7 @@ void TBaseChangeSender::OnGone(ui64 partitionId) { return; } - const auto& sender = it->second; - if (sender.Pending) { - Enqueued.insert(sender.Pending.begin(), sender.Pending.end()); - } - + ReEnqueueRecords(it->second); Senders.erase(it); GonePartitions.push_back(partitionId); @@ -226,13 +207,41 @@ void TBaseChangeSender::OnGone(ui64 partitionId) { Resolver->Resolve(); } +void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) { + Y_VERIFY(Senders.contains(partitionId)); + auto& sender = Senders.at(partitionId); + + Y_VERIFY(sender.Ready); + sender.Ready = false; + + sender.Pending.reserve(sender.Prepared.size()); + for (const auto& record : sender.Prepared) { + sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size()); + MemUsage -= record.GetBody().size(); + } + + ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); +} + +void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { + for (const auto& record : sender.Pending) { + Enqueued.insert(record); + } + + for (const auto& record : sender.Prepared) { + Enqueued.emplace(record.GetOrder(), record.GetBody().size()); + MemUsage -= record.GetBody().size(); + } +} + void TBaseChangeSender::RemoveRecords() { - ui64 pendingStatus = 0; + ui64 pending = 0; for (const auto& [_, sender] : Senders) { - pendingStatus += sender.Pending.size(); + pending += sender.Pending.size(); + pending += sender.Prepared.size(); } - TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pendingStatus)); + TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pending)); for (const auto& record : std::exchange(Enqueued, {})) { remove.push_back(record.Order); @@ -250,6 +259,9 @@ void TBaseChangeSender::RemoveRecords() { for (const auto& record : sender.Pending) { remove.push_back(record.Order); } + for (const auto& record : sender.Prepared) { + remove.push_back(record.GetOrder()); + } } if (remove) { @@ -307,6 +319,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TABLEH() { html << "PartitionId"; } TABLEH() { html << "Ready"; } TABLEH() { html << "Pending"; } + TABLEH() { html << "Prepared"; } TABLEH() { html << "Actor"; } } } @@ -318,6 +331,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TABLED() { html << partitionId; } TABLED() { html << sender.Ready; } TABLED() { html << sender.Pending.size(); } + TABLED() { html << sender.Prepared.size(); } TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); } } } diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 26f5f6efedc..5d00e60af24 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -87,6 +87,7 @@ class TBaseChangeSender: public IChangeSender { TActorId ActorId; bool Ready = false; TVector<TEnqueuedRecord> Pending; + TVector<TChangeRecord> Prepared; }; void CreateMissingSenders(const TVector<ui64>& partitionIds); @@ -95,6 +96,9 @@ class TBaseChangeSender: public IChangeSender { bool RequestRecords(); void SendRecords(); + void SendPreparedRecords(ui64 partitionId); + void ReEnqueueRecords(const TSender& sender); + protected: template <typename T> void RemoveRecords(TVector<T>&& records) { |