diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-06-28 17:20:12 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-06-28 17:20:12 +0300 |
commit | f41f90eacc1ab531dbc05cf10ced62bb31d333be (patch) | |
tree | fc4c00552dbbf5f0952567a255bdea7e1d4c41fe | |
parent | c41cbb372d40fdd4d243b98b76c86308cf8c8dfd (diff) | |
download | ydb-f41f90eacc1ab531dbc05cf10ced62bb31d333be.tar.gz |
Resend broadcast records in correct order
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 2 |
2 files changed, 9 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index 143ce8d90b0..d2fd11f603b 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -16,8 +16,8 @@ void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 pa for (const auto& [order, broadcast] : Broadcasting) { if (AddBroadcastPartition(order, partitionId)) { - // re-schedule record to send it in the correct order - PendingSent.emplace(order, broadcast.Record); + // re-enqueue record to send it in the correct order + Enqueued.insert(broadcast.Record); } } } @@ -122,6 +122,11 @@ void TBaseChangeSender::ProcessRecords(TVector<TChangeRecord>&& records) { MemUsage += record.GetBody().size(); } + if (record.IsBroadcast()) { + // assume that broadcast records are too small to affect memory consumption + MemUsage -= record.GetBody().size(); + } + PendingSent.emplace(record.GetOrder(), std::move(record)); PendingBody.erase(it); } @@ -291,7 +296,7 @@ TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(const TChangeR } auto res = Broadcasting.emplace(record.GetOrder(), TBroadcast{ - .Record = record, + .Record = {record.GetOrder(), record.GetBody().size()}, .Partitions = partitionIds, .PendingPartitions = partitionIds, }); @@ -345,9 +350,7 @@ bool TBaseChangeSender::MaybeCompleteBroadcast(ui64 order) { return false; } - MemUsage -= broadcast.Record.GetBody().size(); Broadcasting.erase(it); - return true; } diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 28c66488769..09856d460bf 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -92,7 +92,7 @@ class TBaseChangeSender: public IChangeSender { }; struct TBroadcast { - const TChangeRecord Record; + const TEnqueuedRecord Record; THashSet<ui64> Partitions; THashSet<ui64> PendingPartitions; THashSet<ui64> CompletedPartitions; |