aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-06-28 17:20:12 +0300
committerilnaz <ilnaz@ydb.tech>2023-06-28 17:20:12 +0300
commitf41f90eacc1ab531dbc05cf10ced62bb31d333be (patch)
treefc4c00552dbbf5f0952567a255bdea7e1d4c41fe
parentc41cbb372d40fdd4d243b98b76c86308cf8c8dfd (diff)
downloadydb-f41f90eacc1ab531dbc05cf10ced62bb31d333be.tar.gz
Resend broadcast records in correct order
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp13
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h2
2 files changed, 9 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index 143ce8d90b0..d2fd11f603b 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -16,8 +16,8 @@ void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 pa
for (const auto& [order, broadcast] : Broadcasting) {
if (AddBroadcastPartition(order, partitionId)) {
- // re-schedule record to send it in the correct order
- PendingSent.emplace(order, broadcast.Record);
+ // re-enqueue record to send it in the correct order
+ Enqueued.insert(broadcast.Record);
}
}
}
@@ -122,6 +122,11 @@ void TBaseChangeSender::ProcessRecords(TVector<TChangeRecord>&& records) {
MemUsage += record.GetBody().size();
}
+ if (record.IsBroadcast()) {
+ // assume that broadcast records are too small to affect memory consumption
+ MemUsage -= record.GetBody().size();
+ }
+
PendingSent.emplace(record.GetOrder(), std::move(record));
PendingBody.erase(it);
}
@@ -291,7 +296,7 @@ TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(const TChangeR
}
auto res = Broadcasting.emplace(record.GetOrder(), TBroadcast{
- .Record = record,
+ .Record = {record.GetOrder(), record.GetBody().size()},
.Partitions = partitionIds,
.PendingPartitions = partitionIds,
});
@@ -345,9 +350,7 @@ bool TBaseChangeSender::MaybeCompleteBroadcast(ui64 order) {
return false;
}
- MemUsage -= broadcast.Record.GetBody().size();
Broadcasting.erase(it);
-
return true;
}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index 28c66488769..09856d460bf 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -92,7 +92,7 @@ class TBaseChangeSender: public IChangeSender {
};
struct TBroadcast {
- const TChangeRecord Record;
+ const TEnqueuedRecord Record;
THashSet<ui64> Partitions;
THashSet<ui64> PendingPartitions;
THashSet<ui64> CompletedPartitions;