aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-06-28 14:55:31 +0300
committerilnaz <ilnaz@ydb.tech>2023-06-28 14:55:31 +0300
commit307c846614da08f10775d15b456ddfc270272b39 (patch)
treefa0c97ea987186f5b9dc8243c592ee8bd037196d
parent938064d942bb0c4149dd0118ad5e0ddde2297783 (diff)
downloadydb-307c846614da08f10775d15b456ddfc270272b39.tar.gz
Introduce broadcast change records
-rw-r--r--ydb/core/protos/change_exchange.proto5
-rw-r--r--ydb/core/tx/datashard/change_record.cpp19
-rw-r--r--ydb/core/tx/datashard/change_record.h2
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp196
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h23
5 files changed, 210 insertions, 35 deletions
diff --git a/ydb/core/protos/change_exchange.proto b/ydb/core/protos/change_exchange.proto
index e9fc976fd3..771c9f4a81 100644
--- a/ydb/core/protos/change_exchange.proto
+++ b/ydb/core/protos/change_exchange.proto
@@ -33,6 +33,9 @@ message TDataChange {
}
}
+message TCdcHeartbeat {
+}
+
message TChangeRecord {
optional uint64 Order = 1;
optional uint64 Group = 2;
@@ -43,7 +46,7 @@ message TChangeRecord {
oneof Kind {
TDataChange AsyncIndex = 7;
TDataChange CdcDataChange = 8;
- // TSchemaChange SchemaChange = 9;
+ TCdcHeartbeat CdcHeartbeat = 9;
}
}
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 8451810d49..3aa7306c7d 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -33,6 +33,9 @@ void TChangeRecord::SerializeToProto(NKikimrChangeExchange::TChangeRecord& recor
Y_VERIFY(record.MutableCdcDataChange()->ParseFromArray(Body.data(), Body.size()));
break;
}
+ case EKind::CdcHeartbeat: {
+ break;
+ }
}
}
@@ -347,6 +350,10 @@ TConstArrayRef<TCell> TChangeRecord::GetKey() const {
Key.ConstructInPlace(key.GetCells());
break;
}
+
+ case EKind::CdcHeartbeat: {
+ Y_FAIL("Not supported");
+ }
}
Y_VERIFY(Key);
@@ -375,7 +382,8 @@ TString TChangeRecord::GetPartitionKey() const {
break;
}
- case EKind::AsyncIndex: {
+ case EKind::AsyncIndex:
+ case EKind::CdcHeartbeat: {
Y_FAIL("Not supported");
}
}
@@ -390,6 +398,15 @@ TInstant TChangeRecord::GetApproximateCreationDateTime() const {
: TInstant::MilliSeconds(GetStep());
}
+bool TChangeRecord::IsBroadcast() const {
+ switch (Kind) {
+ case EKind::CdcHeartbeat:
+ return true;
+ default:
+ return false;
+ }
+}
+
TString TChangeRecord::ToString() const {
TString result;
TStringOutput out(result);
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index a9b74a6549..ae3ea88663 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -25,6 +25,7 @@ public:
enum class EKind: ui8 {
AsyncIndex,
CdcDataChange,
+ CdcHeartbeat,
};
struct TAwsJsonOptions {
@@ -55,6 +56,7 @@ public:
i64 GetSeqNo() const;
TString GetPartitionKey() const;
TInstant GetApproximateCreationDateTime() const;
+ bool IsBroadcast() const;
TString ToString() const;
void Out(IOutputStream& out) const;
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index f96c6da0b9..143ce8d90b 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -4,10 +4,24 @@
#include <library/cpp/monlib/service/pages/mon_page.h>
#include <library/cpp/monlib/service/pages/templates.h>
+#include <util/generic/algorithm.h>
#include <util/generic/size_literals.h>
namespace NKikimr::NDataShard {
+void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
+ Y_VERIFY(!senders.contains(partitionId));
+ auto& sender = senders[partitionId];
+ sender.ActorId = ActorOps->Register(CreateSender(partitionId));
+
+ for (const auto& [order, broadcast] : Broadcasting) {
+ if (AddBroadcastPartition(order, partitionId)) {
+ // re-schedule record to send it in the correct order
+ PendingSent.emplace(order, broadcast.Record);
+ }
+ }
+}
+
void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) {
THashMap<ui64, TSender> senders;
@@ -17,14 +31,14 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds)
senders.emplace(partitionId, std::move(it->second));
Senders.erase(it);
} else {
- Y_VERIFY(!senders.contains(partitionId));
- auto& sender = senders[partitionId];
- sender.ActorId = ActorOps->Register(CreateSender(partitionId));
+ RegisterSender(senders, partitionId);
}
}
- for (const auto& [_, sender] : Senders) {
+ for (const auto& [partitionId, sender] : Senders) {
ReEnqueueRecords(sender);
+ ProcessBroadcasting(&TBaseChangeSender::RemoveBroadcastPartition,
+ partitionId, sender.Broadcasting);
ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
}
@@ -33,9 +47,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds)
void TBaseChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) {
for (const auto& partitionId : partitionIds) {
- Y_VERIFY(!Senders.contains(partitionId));
- auto& sender = Senders[partitionId];
- sender.ActorId = ActorOps->Register(CreateSender(partitionId));
+ RegisterSender(Senders, partitionId);
}
}
@@ -131,17 +143,34 @@ void TBaseChangeSender::SendRecords() {
bool needToResolve = false;
while (it != PendingSent.end()) {
- const ui64 partitionId = Resolver->GetPartitionId(it->second);
- if (!Senders.contains(partitionId)) {
- needToResolve = true;
- ++it;
- continue;
- }
+ if (!it->second.IsBroadcast()) {
+ const ui64 partitionId = Resolver->GetPartitionId(it->second);
+ if (!Senders.contains(partitionId)) {
+ needToResolve = true;
+ ++it;
+ continue;
+ }
+
+ auto& sender = Senders.at(partitionId);
+ sender.Prepared.push_back(std::move(it->second));
+ if (sender.Ready) {
+ sendTo.insert(partitionId);
+ }
+ } else {
+ auto& broadcast = EnsureBroadcast(it->second);
+ EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) {
+ if (Senders.contains(partitionId)) {
+ auto& sender = Senders.at(partitionId);
+ sender.Prepared.push_back(std::move(it->second));
+ if (sender.Ready) {
+ sendTo.insert(partitionId);
+ }
+
+ return true;
+ }
- auto& sender = Senders.at(partitionId);
- sender.Prepared.push_back(std::move(it->second));
- if (sender.Ready) {
- sendTo.insert(partitionId);
+ return false;
+ });
}
it = PendingSent.erase(it);
@@ -185,6 +214,11 @@ void TBaseChangeSender::OnReady(ui64 partitionId) {
RemoveRecords(std::exchange(sender.Pending, {}));
}
+ if (sender.Broadcasting) {
+ ProcessBroadcasting(&TBaseChangeSender::CompleteBroadcastPartition,
+ partitionId, std::exchange(sender.Broadcasting, {}));
+ }
+
if (sender.Prepared) {
SendPreparedRecords(partitionId);
}
@@ -216,8 +250,12 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
sender.Pending.reserve(sender.Prepared.size());
for (const auto& record : sender.Prepared) {
- sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size());
- MemUsage -= record.GetBody().size();
+ if (!record.IsBroadcast()) {
+ sender.Pending.emplace_back(record.GetOrder(), record.GetBody().size());
+ MemUsage -= record.GetBody().size();
+ } else {
+ sender.Broadcasting.push_back(record.GetOrder());
+ }
}
ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
@@ -229,43 +267,135 @@ void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
}
for (const auto& record : sender.Prepared) {
- Enqueued.emplace(record.GetOrder(), record.GetBody().size());
- MemUsage -= record.GetBody().size();
+ if (!record.IsBroadcast()) {
+ Enqueued.emplace(record.GetOrder(), record.GetBody().size());
+ MemUsage -= record.GetBody().size();
+ }
}
}
-void TBaseChangeSender::RemoveRecords() {
- ui64 pending = 0;
- for (const auto& [_, sender] : Senders) {
- pending += sender.Pending.size();
- pending += sender.Prepared.size();
+TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(const TChangeRecord& record) {
+ Y_VERIFY(record.IsBroadcast());
+
+ auto it = Broadcasting.find(record.GetOrder());
+ if (it != Broadcasting.end()) {
+ return it->second;
+ }
+
+ THashSet<ui64> partitionIds;
+ for (const auto& [partitionId, _] : Senders) {
+ partitionIds.insert(partitionId);
+ }
+ for (const auto partitionId : GonePartitions) {
+ partitionIds.insert(partitionId);
+ }
+
+ auto res = Broadcasting.emplace(record.GetOrder(), TBroadcast{
+ .Record = record,
+ .Partitions = partitionIds,
+ .PendingPartitions = partitionIds,
+ });
+
+ return res.first->second;
+}
+
+bool TBaseChangeSender::AddBroadcastPartition(ui64 order, ui64 partitionId) {
+ auto it = Broadcasting.find(order);
+ Y_VERIFY(it != Broadcasting.end());
+
+ auto& broadcast = it->second;
+ if (broadcast.Partitions.contains(partitionId)) {
+ return false;
+ }
+
+ broadcast.Partitions.insert(partitionId);
+ broadcast.PendingPartitions.insert(partitionId);
+
+ return true;
+}
+
+bool TBaseChangeSender::RemoveBroadcastPartition(ui64 order, ui64 partitionId) {
+ auto it = Broadcasting.find(order);
+ Y_VERIFY(it != Broadcasting.end());
+
+ auto& broadcast = it->second;
+ broadcast.Partitions.erase(partitionId);
+ broadcast.PendingPartitions.erase(partitionId);
+ broadcast.CompletedPartitions.erase(partitionId);
+
+ return MaybeCompleteBroadcast(order);
+}
+
+bool TBaseChangeSender::CompleteBroadcastPartition(ui64 order, ui64 partitionId) {
+ auto it = Broadcasting.find(order);
+ Y_VERIFY(it != Broadcasting.end());
+
+ auto& broadcast = it->second;
+ broadcast.CompletedPartitions.insert(partitionId);
+
+ return MaybeCompleteBroadcast(order);
+}
+
+bool TBaseChangeSender::MaybeCompleteBroadcast(ui64 order) {
+ auto it = Broadcasting.find(order);
+ Y_VERIFY(it != Broadcasting.end());
+
+ auto& broadcast = it->second;
+ if (broadcast.PendingPartitions || broadcast.Partitions.size() != broadcast.CompletedPartitions.size()) {
+ return false;
+ }
+
+ MemUsage -= broadcast.Record.GetBody().size();
+ Broadcasting.erase(it);
+
+ return true;
+}
+
+void TBaseChangeSender::ProcessBroadcasting(std::function<bool(TBaseChangeSender*, ui64, ui64)> f,
+ ui64 partitionId, const TVector<ui64>& broadcasting)
+{
+ TVector<ui64> remove;
+ for (const auto order : broadcasting) {
+ if (std::invoke(f, this, order, partitionId)) {
+ remove.push_back(order);
+ }
+ }
+
+ if (remove) {
+ RemoveRecords(std::move(remove));
}
+}
- TVector<ui64> remove(Reserve(Enqueued.size() + PendingBody.size() + PendingSent.size() + pending));
+void TBaseChangeSender::RemoveRecords() {
+ THashSet<ui64> remove;
for (const auto& record : std::exchange(Enqueued, {})) {
- remove.push_back(record.Order);
+ remove.insert(record.Order);
}
for (const auto& record : std::exchange(PendingBody, {})) {
- remove.push_back(record.Order);
+ remove.insert(record.Order);
}
for (const auto& [order, _] : std::exchange(PendingSent, {})) {
- remove.push_back(order);
+ remove.insert(order);
+ }
+
+ for (const auto& [order, _] : std::exchange(Broadcasting, {})) {
+ remove.insert(order);
}
for (const auto& [_, sender] : Senders) {
for (const auto& record : sender.Pending) {
- remove.push_back(record.Order);
+ remove.insert(record.Order);
}
for (const auto& record : sender.Prepared) {
- remove.push_back(record.GetOrder());
+ remove.insert(record.GetOrder());
}
}
if (remove) {
- ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
+ RemoveRecords(TVector<ui64>(remove.begin(), remove.end()));
}
}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index 5d00e60af2..28c6648876 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -88,8 +88,17 @@ class TBaseChangeSender: public IChangeSender {
bool Ready = false;
TVector<TEnqueuedRecord> Pending;
TVector<TChangeRecord> Prepared;
+ TVector<ui64> Broadcasting;
};
+ struct TBroadcast {
+ const TChangeRecord Record;
+ THashSet<ui64> Partitions;
+ THashSet<ui64> PendingPartitions;
+ THashSet<ui64> CompletedPartitions;
+ };
+
+ void RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId);
void CreateMissingSenders(const TVector<ui64>& partitionIds);
void RecreateSenders(const TVector<ui64>& partitionIds);
@@ -99,6 +108,14 @@ class TBaseChangeSender: public IChangeSender {
void SendPreparedRecords(ui64 partitionId);
void ReEnqueueRecords(const TSender& sender);
+ TBroadcast& EnsureBroadcast(const TChangeRecord& record);
+ bool AddBroadcastPartition(ui64 order, ui64 partitionId);
+ bool RemoveBroadcastPartition(ui64 order, ui64 partitionId);
+ bool CompleteBroadcastPartition(ui64 order, ui64 partitionId);
+ bool MaybeCompleteBroadcast(ui64 order);
+ void ProcessBroadcasting(std::function<bool(TBaseChangeSender*, ui64, ui64)> f,
+ ui64 partitionId, const TVector<ui64>& broadcasting);
+
protected:
template <typename T>
void RemoveRecords(TVector<T>&& records) {
@@ -110,6 +127,11 @@ protected:
ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}
+ template <>
+ void RemoveRecords(TVector<ui64>&& records) {
+ ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
+ }
+
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
void KillSenders() override;
void RemoveRecords() override;
@@ -141,6 +163,7 @@ private:
TSet<TEnqueuedRecord> Enqueued;
TSet<TRequestedRecord> PendingBody;
TMap<ui64, TChangeRecord> PendingSent; // ui64 is order
+ THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order
TVector<ui64> GonePartitions;