diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-06-28 14:55:31 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-06-28 14:55:31 +0300 |
commit | 307c846614da08f10775d15b456ddfc270272b39 (patch) | |
tree | fa0c97ea987186f5b9dc8243c592ee8bd037196d | |
parent | 938064d942bb0c4149dd0118ad5e0ddde2297783 (diff) | |
download | ydb-307c846614da08f10775d15b456ddfc270272b39.tar.gz |
Introduce broadcast change records
-rw-r--r-- | ydb/core/protos/change_exchange.proto | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 196 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 23 |
5 files changed, 210 insertions, 35 deletions
diff --git a/ydb/core/protos/change_exchange.proto b/ydb/core/protos/change_exchange.proto index e9fc976fd3..771c9f4a81 100644 --- a/ydb/core/protos/change_exchange.proto +++ b/ydb/core/protos/change_exchange.proto @@ -33,6 +33,9 @@ message TDataChange { } } +message TCdcHeartbeat { +} + message TChangeRecord { optional uint64 Order = 1; optional uint64 Group = 2; @@ -43,7 +46,7 @@ message TChangeRecord { oneof Kind { TDataChange AsyncIndex = 7; TDataChange CdcDataChange = 8; - // TSchemaChange SchemaChange = 9; + TCdcHeartbeat CdcHeartbeat = 9; } } diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 8451810d49..3aa7306c7d 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -33,6 +33,9 @@ void TChangeRecord::SerializeToProto(NKikimrChangeExchange::TChangeRecord& recor Y_VERIFY(record.MutableCdcDataChange()->ParseFromArray(Body.data(), Body.size())); break; } + case EKind::CdcHeartbeat: { + break; + } } } @@ -347,6 +350,10 @@ TConstArrayRef<TCell> TChangeRecord::GetKey() const { Key.ConstructInPlace(key.GetCells()); break; } + + case EKind::CdcHeartbeat: { + Y_FAIL("Not supported"); + } } Y_VERIFY(Key); @@ -375,7 +382,8 @@ TString TChangeRecord::GetPartitionKey() const { break; } - case EKind::AsyncIndex: { + case EKind::AsyncIndex: + case EKind::CdcHeartbeat: { Y_FAIL("Not supported"); } } @@ -390,6 +398,15 @@ TInstant TChangeRecord::GetApproximateCreationDateTime() const { : TInstant::MilliSeconds(GetStep()); } +bool TChangeRecord::IsBroadcast() const { + switch (Kind) { + case EKind::CdcHeartbeat: + return true; + default: + return false; + } +} + TString TChangeRecord::ToString() const { TString result; TStringOutput out(result); diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index a9b74a6549..ae3ea88663 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -25,6 +25,7 @@ public: enum class EKind: ui8 { AsyncIndex, CdcDataChange, + CdcHeartbeat, }; struct TAwsJsonOptions { @@ -55,6 +56,7 @@ public: i64 GetSeqNo() const; TString GetPartitionKey() const; TInstant GetApproximateCreationDateTime() const; + bool IsBroadcast() const; TString ToString() const; void Out(IOutputStream& out) const; diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index f96c6da0b9..143ce8d90b 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -4,10 +4,24 @@ #include <library/cpp/monlib/service/pages/mon_page.h> #include <library/cpp/monlib/service/pages/templates.h> +#include <util/generic/algorithm.h> #include <util/generic/size_literals.h> namespace NKikimr::NDataShard { +void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId) { + Y_VERIFY(!senders.contains(partitionId)); + auto& sender = senders[partitionId]; + sender.ActorId = ActorOps->Register(CreateSender(partitionId)); + + for (const auto& [order, broadcast] : Broadcasting) { + if (AddBroadcastPartition(order, partitionId)) { + // re-schedule record to send it in the correct order + PendingSent.emplace(order, broadcast.Record); + } + } +} + void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) { THashMap<ui64, TSender> senders; @@ -17,14 +31,14 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) senders.emplace(partitionId, std::move(it->second)); Senders.erase(it); } else { - Y_VERIFY(!senders.contains(partitionId)); - auto& sender = senders[partitionId]; - sender.ActorId = ActorOps->Register(CreateSender(partitionId)); + RegisterSender(senders, partitionId); } } - for (const auto& [_, sender] : Senders) { + for (const auto& [partitionId, sender] : Senders) { ReEnqueueRecords(sender); + ProcessBroadcasting(&TBaseChangeSender::RemoveBroadcastPartition, + partitionId, sender.Broadcasting); ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); } @@ -33,9 +47,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) void TBaseChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) { for (const auto& partitionId : partitionIds) { - Y_VERIFY(!Senders.contains(partitionId)); - auto& sender = Senders[partitionId]; - sender.ActorId = ActorOps->Register(CreateSender(partitionId)); + RegisterSender(Senders, partitionId); } } @@ -131,17 +143,34 @@ void TBaseChangeSender::SendRecords() { bool needToResolve = false; while (it != PendingSent.end()) { - const ui64 partitionId = Resolver->GetPartitionId(it->second); - if (!Senders.contains(partitionId)) { - needToResolve = true; - ++it; - continue; - } + if (!it->second.IsBroadcast()) { + const ui64 partitionId = Resolver->GetPartitionId(it->second); + if (!Senders.contains(partitionId)) { + needToResolve = true; + ++it; + continue; + } + + auto& sender = Senders.at(partitionId); + sender.Prepared.push_back(std::move(it->second)); + if (sender.Ready) { + sendTo.insert(partitionId); + } + } else { + auto& broadcast = EnsureBroadcast(it->second); + EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) { + if (Senders.contains(partitionId)) { + auto& sender = Senders.at(partitionId); + sender.Prepared.push_back(std::move(it->second)); + if (sender.Ready) { + sendTo.insert(partitionId); + } + + return true; + } - auto& sender = Senders.at(partitionId); - sender.Prepared.push_back(std::move(it->second)); - if (sender.Ready) { - sendTo.insert(partitionId); + return false; + }); } it = PendingSent.erase(it); @@ -185,6 +214,11 @@ void TBaseChangeSender::OnReady(ui64 partitionId) { RemoveRecords(std::exchange(sender.Pending, {})); } + if (sender.Broadcasting) { + ProcessBroadcasting(&TBaseChangeSender::CompleteBroadcastPartition, + partitionId, std::exchange(sender.Broadcasting, {})); + } + if (sender.Prepared) { SendPreparedRecords(partitionId); } @@ -216,8 +250,12 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) { sender.Pending.reserve(sender.Prepared.size()); for (const auto& record : sender.Prepared) { - sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size()); - MemUsage -= record.GetBody().size(); + if (!record.IsBroadcast()) { + sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size()); + MemUsage -= record.GetBody().size(); + } else { + sender.Broadcasting.push_back(record.GetOrder()); + } } ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); @@ -229,43 +267,135 @@ void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) { } for (const auto& record : sender.Prepared) { - Enqueued.emplace(record.GetOrder(), record.GetBody().size()); - MemUsage -= record.GetBody().size(); + if (!record.IsBroadcast()) { + Enqueued.emplace(record.GetOrder(), record.GetBody().size()); + MemUsage -= record.GetBody().size(); + } } } -void TBaseChangeSender::RemoveRecords() { - ui64 pending = 0; - for (const auto& [_, sender] : Senders) { - pending += sender.Pending.size(); - pending += sender.Prepared.size(); +TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(const TChangeRecord& record) { + Y_VERIFY(record.IsBroadcast()); + + auto it = Broadcasting.find(record.GetOrder()); + if (it != Broadcasting.end()) { + return it->second; + } + + THashSet<ui64> partitionIds; + for (const auto& [partitionId, _] : Senders) { + partitionIds.insert(partitionId); + } + for (const auto partitionId : GonePartitions) { + partitionIds.insert(partitionId); + } + + auto res = Broadcasting.emplace(record.GetOrder(), TBroadcast{ + .Record = record, + .Partitions = partitionIds, + .PendingPartitions = partitionIds, + }); + + return res.first->second; +} + +bool TBaseChangeSender::AddBroadcastPartition(ui64 order, ui64 partitionId) { + auto it = Broadcasting.find(order); + Y_VERIFY(it != Broadcasting.end()); + + auto& broadcast = it->second; + if (broadcast.Partitions.contains(partitionId)) { + return false; + } + + broadcast.Partitions.insert(partitionId); + broadcast.PendingPartitions.insert(partitionId); + + return true; +} + +bool TBaseChangeSender::RemoveBroadcastPartition(ui64 order, ui64 partitionId) { + auto it = Broadcasting.find(order); + Y_VERIFY(it != Broadcasting.end()); + + auto& broadcast = it->second; + broadcast.Partitions.erase(partitionId); + broadcast.PendingPartitions.erase(partitionId); + broadcast.CompletedPartitions.erase(partitionId); + + return MaybeCompleteBroadcast(order); +} + +bool TBaseChangeSender::CompleteBroadcastPartition(ui64 order, ui64 partitionId) { + auto it = Broadcasting.find(order); + Y_VERIFY(it != Broadcasting.end()); + + auto& broadcast = it->second; + broadcast.CompletedPartitions.insert(partitionId); + + return MaybeCompleteBroadcast(order); +} + +bool TBaseChangeSender::MaybeCompleteBroadcast(ui64 order) { + auto it = Broadcasting.find(order); + Y_VERIFY(it != Broadcasting.end()); + + auto& broadcast = it->second; + if (broadcast.PendingPartitions || broadcast.Partitions.size() != broadcast.CompletedPartitions.size()) { + return false; + } + + MemUsage -= broadcast.Record.GetBody().size(); + Broadcasting.erase(it); + + return true; +} + +void TBaseChangeSender::ProcessBroadcasting(std::function<bool(TBaseChangeSender*, ui64, ui64)> f, + ui64 partitionId, const TVector<ui64>& broadcasting) +{ + TVector<ui64> remove; + for (const auto order : broadcasting) { + if (std::invoke(f, this, order, partitionId)) { + remove.push_back(order); + } + } + + if (remove) { + RemoveRecords(std::move(remove)); } +} - TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pending)); +void TBaseChangeSender::RemoveRecords() { + THashSet<ui64> remove; for (const auto& record : std::exchange(Enqueued, {})) { - remove.push_back(record.Order); + remove.insert(record.Order); } for (const auto& record : std::exchange(PendingBody, {})) { - remove.push_back(record.Order); + remove.insert(record.Order); } for (const auto& [order, _] : std::exchange(PendingSent, {})) { - remove.push_back(order); + remove.insert(order); + } + + for (const auto& [order, _] : std::exchange(Broadcasting, {})) { + remove.insert(order); } for (const auto& [_, sender] : Senders) { for (const auto& record : sender.Pending) { - remove.push_back(record.Order); + remove.insert(record.Order); } for (const auto& record : sender.Prepared) { - remove.push_back(record.GetOrder()); + remove.insert(record.GetOrder()); } } if (remove) { - ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + RemoveRecords(TVector<ui64>(remove.begin(), remove.end())); } } diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 5d00e60af2..28c6648876 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -88,8 +88,17 @@ class TBaseChangeSender: public IChangeSender { bool Ready = false; TVector<TEnqueuedRecord> Pending; TVector<TChangeRecord> Prepared; + TVector<ui64> Broadcasting; }; + struct TBroadcast { + const TChangeRecord Record; + THashSet<ui64> Partitions; + THashSet<ui64> PendingPartitions; + THashSet<ui64> CompletedPartitions; + }; + + void RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId); void CreateMissingSenders(const TVector<ui64>& partitionIds); void RecreateSenders(const TVector<ui64>& partitionIds); @@ -99,6 +108,14 @@ class TBaseChangeSender: public IChangeSender { void SendPreparedRecords(ui64 partitionId); void ReEnqueueRecords(const TSender& sender); + TBroadcast& EnsureBroadcast(const TChangeRecord& record); + bool AddBroadcastPartition(ui64 order, ui64 partitionId); + bool RemoveBroadcastPartition(ui64 order, ui64 partitionId); + bool CompleteBroadcastPartition(ui64 order, ui64 partitionId); + bool MaybeCompleteBroadcast(ui64 order); + void ProcessBroadcasting(std::function<bool(TBaseChangeSender*, ui64, ui64)> f, + ui64 partitionId, const TVector<ui64>& broadcasting); + protected: template <typename T> void RemoveRecords(TVector<T>&& records) { @@ -110,6 +127,11 @@ protected: ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } + template <> + void RemoveRecords(TVector<ui64>&& records) { + ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(records))); + } + void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override; void KillSenders() override; void RemoveRecords() override; @@ -141,6 +163,7 @@ private: TSet<TEnqueuedRecord> Enqueued; TSet<TRequestedRecord> PendingBody; TMap<ui64, TChangeRecord> PendingSent; // ui64 is order + THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order TVector<ui64> GonePartitions; |