aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-06-27 19:01:30 +0300
committerilnaz <ilnaz@ydb.tech>2023-06-27 19:01:30 +0300
commit783d4fe2d9dd78c8202e673ffa89632c2087c6fd (patch)
tree6247f041ddf20c9349b855fb613b9d4777f548f5
parentcd23cf14ba0e02916da11dafb03202741f855ab3 (diff)
downloadydb-783d4fe2d9dd78c8202e673ffa89632c2087c6fd.tar.gz
(refactoring) Prepared change records
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp82
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h4
2 files changed, 52 insertions, 34 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index 4d8ab339b81..f96c6da0b94 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -24,10 +24,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds)
}
for (const auto& [_, sender] : Senders) {
- if (sender.Pending) {
- Enqueued.insert(sender.Pending.begin(), sender.Pending.end());
- }
-
+ ReEnqueueRecords(sender);
ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
}
@@ -130,7 +127,7 @@ void TBaseChangeSender::SendRecords() {
}
auto it = PendingSent.begin();
- THashMap<ui64, TVector<TChangeRecord>> forward;
+ THashSet<ui64> sendTo;
bool needToResolve = false;
while (it != PendingSent.end()) {
@@ -141,31 +138,17 @@ void TBaseChangeSender::SendRecords() {
continue;
}
- const auto& sender = Senders.at(partitionId);
- if (!sender.Ready) {
- ++it;
- continue;
+ auto& sender = Senders.at(partitionId);
+ sender.Prepared.push_back(std::move(it->second));
+ if (sender.Ready) {
+ sendTo.insert(partitionId);
}
- MemUsage -= it->second.GetBody().size();
-
- forward[partitionId].push_back(std::move(it->second));
it = PendingSent.erase(it);
}
- for (auto& [partitionId, records] : forward) {
- Y_VERIFY(Senders.contains(partitionId));
- auto& sender = Senders.at(partitionId);
-
- Y_VERIFY(sender.Ready);
- sender.Ready = false;
-
- sender.Pending.reserve(records.size());
- for (const auto& record : records) {
- sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size());
- }
-
- ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::move(records)));
+ for (const auto partitionId : sendTo) {
+ SendPreparedRecords(partitionId);
}
if (needToResolve && !Resolver->IsResolving()) {
@@ -202,7 +185,9 @@ void TBaseChangeSender::OnReady(ui64 partitionId) {
RemoveRecords(std::exchange(sender.Pending, {}));
}
- SendRecords();
+ if (sender.Prepared) {
+ SendPreparedRecords(partitionId);
+ }
}
void TBaseChangeSender::OnGone(ui64 partitionId) {
@@ -211,11 +196,7 @@ void TBaseChangeSender::OnGone(ui64 partitionId) {
return;
}
- const auto& sender = it->second;
- if (sender.Pending) {
- Enqueued.insert(sender.Pending.begin(), sender.Pending.end());
- }
-
+ ReEnqueueRecords(it->second);
Senders.erase(it);
GonePartitions.push_back(partitionId);
@@ -226,13 +207,41 @@ void TBaseChangeSender::OnGone(ui64 partitionId) {
Resolver->Resolve();
}
+void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
+ Y_VERIFY(Senders.contains(partitionId));
+ auto& sender = Senders.at(partitionId);
+
+ Y_VERIFY(sender.Ready);
+ sender.Ready = false;
+
+ sender.Pending.reserve(sender.Prepared.size());
+ for (const auto& record : sender.Prepared) {
+ sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size());
+ MemUsage -= record.GetBody().size();
+ }
+
+ ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
+}
+
+void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
+ for (const auto& record : sender.Pending) {
+ Enqueued.insert(record);
+ }
+
+ for (const auto& record : sender.Prepared) {
+ Enqueued.emplace(record.GetOrder(), record.GetBody().size());
+ MemUsage -= record.GetBody().size();
+ }
+}
+
void TBaseChangeSender::RemoveRecords() {
- ui64 pendingStatus = 0;
+ ui64 pending = 0;
for (const auto& [_, sender] : Senders) {
- pendingStatus += sender.Pending.size();
+ pending += sender.Pending.size();
+ pending += sender.Prepared.size();
}
- TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pendingStatus));
+ TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pending));
for (const auto& record : std::exchange(Enqueued, {})) {
remove.push_back(record.Order);
@@ -250,6 +259,9 @@ void TBaseChangeSender::RemoveRecords() {
for (const auto& record : sender.Pending) {
remove.push_back(record.Order);
}
+ for (const auto& record : sender.Prepared) {
+ remove.push_back(record.GetOrder());
+ }
}
if (remove) {
@@ -307,6 +319,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TABLEH() { html << "PartitionId"; }
TABLEH() { html << "Ready"; }
TABLEH() { html << "Pending"; }
+ TABLEH() { html << "Prepared"; }
TABLEH() { html << "Actor"; }
}
}
@@ -318,6 +331,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TABLED() { html << partitionId; }
TABLED() { html << sender.Ready; }
TABLED() { html << sender.Pending.size(); }
+ TABLED() { html << sender.Prepared.size(); }
TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); }
}
}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index 26f5f6efedc..5d00e60af24 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -87,6 +87,7 @@ class TBaseChangeSender: public IChangeSender {
TActorId ActorId;
bool Ready = false;
TVector<TEnqueuedRecord> Pending;
+ TVector<TChangeRecord> Prepared;
};
void CreateMissingSenders(const TVector<ui64>& partitionIds);
@@ -95,6 +96,9 @@ class TBaseChangeSender: public IChangeSender {
bool RequestRecords();
void SendRecords();
+ void SendPreparedRecords(ui64 partitionId);
+ void ReEnqueueRecords(const TSender& sender);
+
protected:
template <typename T>
void RemoveRecords(TVector<T>&& records) {