aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2023-12-19 14:36:04 +0300
committerGitHub <noreply@github.com>2023-12-19 14:36:04 +0300
commitc5b9fe2d7a89c892ce42b53171dcd910a450dc46 (patch)
tree917eef4515484e1288a30d0b36f4701e03cb1ee5
parent00eda0ef0280453bcea25c498e1c8356e483a489 (diff)
downloadydb-c5b9fe2d7a89c892ce42b53171dcd910a450dc46.tar.gz
Optimize heartbeats emission KIKIMR-20392 (#557)
-rw-r--r--ydb/core/persqueue/partition.cpp1
-rw-r--r--ydb/core/persqueue/partition.h1
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.cpp11
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.h4
-rw-r--r--ydb/core/persqueue/partition_write.cpp71
-rw-r--r--ydb/core/persqueue/sourceid.cpp97
-rw-r--r--ydb/core/persqueue/sourceid.h19
-rw-r--r--ydb/core/persqueue/ut/sourceid_ut.cpp137
8 files changed, 262 insertions, 79 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 7d814dd3f26..64aebab88e6 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -187,6 +187,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui
, NumChannels(numChannels)
, WriteBufferIsFullCounter(nullptr)
, WriteLagMs(TDuration::Minutes(1), 100)
+ , LastEmittedHeartbeat(TRowVersion::Min())
{
TabletCounters.Populate(Counters);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 914e330e7cd..dd0fa7e36ea 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -755,6 +755,7 @@ private:
TInstant LastUsedStorageMeterTimestamp;
TDeque<std::unique_ptr<IEventBase>> PendingEvents;
+ TRowVersion LastEmittedHeartbeat;
};
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp
index d6aabef3e89..0e8f6d78127 100644
--- a/ydb/core/persqueue/partition_sourcemanager.cpp
+++ b/ydb/core/persqueue/partition_sourcemanager.cpp
@@ -236,7 +236,7 @@ TPartitionSourceManager::TModificationBatch::~TModificationBatch() {
}
}
-TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const {
+TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmitHeartbeat() const {
return HeartbeatEmitter.CanEmit();
}
@@ -331,13 +331,8 @@ void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TI
}
}
-void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) {
- Batch.HeartbeatEmitter.Process(SourceId, heartbeat);
- if (InMemory == MemoryStorage().end()) {
- Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, std::move(heartbeat));
- } else {
- Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat)));
- }
+void TPartitionSourceManager::TSourceManager::Update(THeartbeat&& heartbeat) {
+ Batch.HeartbeatEmitter.Process(SourceId, std::move(heartbeat));
}
TPartitionSourceManager::TSourceManager::operator bool() const {
diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h
index 4899e1598af..a05a51aa970 100644
--- a/ydb/core/persqueue/partition_sourcemanager.h
+++ b/ydb/core/persqueue/partition_sourcemanager.h
@@ -50,7 +50,7 @@ public:
std::optional<ui64> UpdatedSeqNo() const;
void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
- void Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat);
+ void Update(THeartbeat&& heartbeat);
operator bool() const;
@@ -77,7 +77,7 @@ public:
TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format);
~TModificationBatch();
- TMaybe<THeartbeat> CanEmit() const;
+ TMaybe<THeartbeat> CanEmitHeartbeat() const;
TSourceManager GetSource(const TString& id);
void Cancel();
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 204d96b38c9..164e21643b9 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -910,13 +910,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
<< " version " << *hbVersion
);
- auto heartbeat = THeartbeat{
- .Version = *hbVersion,
- .Data = p.Msg.Data,
- };
-
- sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat));
-
+ sourceId.Update(THeartbeat{*hbVersion, p.Msg.Data});
return ProcessResult::Continue;
}
@@ -1188,6 +1182,41 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
}
+ if (const auto heartbeat = sourceIdBatch.CanEmitHeartbeat()) {
+ if (heartbeat->Version > LastEmittedHeartbeat) {
+ LOG_INFO_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "Topic '" << TopicName() << "' partition " << Partition
+ << " emit heartbeat " << heartbeat->Version
+ );
+
+ auto hbMsg = TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
+ .SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
+ .SeqNo = 0, // we don't use SeqNo because we disable deduplication
+ .PartNo = 0,
+ .TotalParts = 1,
+ .TotalSize = static_cast<ui32>(heartbeat->Data.size()),
+ .CreateTimestamp = CurrentTimestamp.MilliSeconds(),
+ .ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
+ .DisableDeduplication = true,
+ .WriteTimestamp = CurrentTimestamp.MilliSeconds(),
+ .Data = heartbeat->Data,
+ .UncompressedSize = 0,
+ .PartitionKey = {},
+ .ExplicitHashKey = {},
+ .External = false,
+ .IgnoreQuotaDeadline = true,
+ .HeartbeatVersion = std::nullopt,
+ }};
+
+ WriteInflightSize += heartbeat->Data.size();
+ auto result = ProcessRequest(hbMsg, parameters, request, ctx);
+ Y_ABORT_UNLESS(result == ProcessResult::Continue);
+
+ LastEmittedHeartbeat = heartbeat->Version;
+ }
+ }
+
UpdateWriteBufferIsFullState(ctx.Now());
if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) {
@@ -1385,34 +1414,6 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
}
}
- if (const auto heartbeat = sourceIdBatch.CanEmit()) {
- LOG_INFO_S(
- ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicName() << "' partition " << Partition
- << " emit heartbeat " << heartbeat->Version
- );
-
- EmplaceRequest(TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
- .SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
- .SeqNo = 0, // we don't use SeqNo because we disable deduplication
- .PartNo = 0,
- .TotalParts = 1,
- .TotalSize = static_cast<ui32>(heartbeat->Data.size()),
- .CreateTimestamp = CurrentTimestamp.MilliSeconds(),
- .ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
- .DisableDeduplication = true,
- .WriteTimestamp = CurrentTimestamp.MilliSeconds(),
- .Data = heartbeat->Data,
- .UncompressedSize = 0,
- .PartitionKey = {},
- .ExplicitHashKey = {},
- .External = false,
- .IgnoreQuotaDeadline = true,
- .HeartbeatVersion = std::nullopt,
- }}, ctx);
- WriteInflightSize += heartbeat->Data.size();
- }
-
if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
if (!sourceIdBatch.HasModifications()) {
return request->Record.CmdWriteSize() > 0
diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp
index 4d124f7eda0..e2176d8c337 100644
--- a/ydb/core/persqueue/sourceid.cpp
+++ b/ydb/core/persqueue/sourceid.cpp
@@ -79,13 +79,6 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark,
void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd);
}
-THeartbeatProcessor::THeartbeatProcessor(
- const THashSet<TString>& sourceIdsWithHeartbeat,
- const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat)
- : SourceIdsWithHeartbeat(sourceIdsWithHeartbeat)
- , SourceIdsByHeartbeat(sourceIdsByHeartbeat)
-{
-}
void THeartbeatProcessor::ApplyHeartbeat(const TString& sourceId, const TRowVersion& version) {
SourceIdsWithHeartbeat.insert(sourceId);
@@ -501,29 +494,32 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti
/// THeartbeatEmitter
THeartbeatEmitter::THeartbeatEmitter(const TSourceIdStorage& storage)
- : THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat)
- , Storage(storage)
+ : Storage(storage)
{
}
-void THeartbeatEmitter::Process(const TString& sourceId, const THeartbeat& heartbeat) {
- Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(sourceId));
- const auto& sourceIdInfo = Storage.InMemorySourceIds.at(sourceId);
+void THeartbeatEmitter::Process(const TString& sourceId, THeartbeat&& heartbeat) {
+ auto it = Storage.InMemorySourceIds.find(sourceId);
+ if (it != Storage.InMemorySourceIds.end() && it->second.LastHeartbeat) {
+ if (heartbeat.Version <= it->second.LastHeartbeat->Version) {
+ return;
+ }
+ }
- if (const auto& lastHeartbeat = sourceIdInfo.LastHeartbeat) {
- ForgetHeartbeat(sourceId, lastHeartbeat->Version);
+ if (!Storage.SourceIdsWithHeartbeat.contains(sourceId)) {
+ NewSourceIdsWithHeartbeat.insert(sourceId);
}
- if (LastHeartbeats.contains(sourceId)) {
- ForgetHeartbeat(sourceId, LastHeartbeats.at(sourceId).Version);
+ if (Heartbeats.contains(sourceId)) {
+ ForgetHeartbeat(sourceId, Heartbeats.at(sourceId).Version);
}
ApplyHeartbeat(sourceId, heartbeat.Version);
- LastHeartbeats[sourceId] = heartbeat;
+ Heartbeats[sourceId] = std::move(heartbeat);
}
TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const {
- if (SourceIdsWithHeartbeat.size() != Storage.ExplicitSourceIds.size()) {
+ if (Storage.ExplicitSourceIds.size() != (Storage.SourceIdsWithHeartbeat.size() + NewSourceIdsWithHeartbeat.size())) {
return Nothing();
}
@@ -531,19 +527,68 @@ TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const {
return Nothing();
}
- auto it = SourceIdsByHeartbeat.begin();
- if (Storage.SourceIdsByHeartbeat.empty() || it->first > Storage.SourceIdsByHeartbeat.begin()->first) {
- Y_ABORT_UNLESS(!it->second.empty());
- const auto& someSourceId = *it->second.begin();
+ if (!NewSourceIdsWithHeartbeat.empty()) { // just got quorum
+ if (!Storage.SourceIdsByHeartbeat.empty() && Storage.SourceIdsByHeartbeat.begin()->first < SourceIdsByHeartbeat.begin()->first) {
+ return GetFromStorage(Storage.SourceIdsByHeartbeat.begin());
+ } else {
+ return GetFromDiff(SourceIdsByHeartbeat.begin());
+ }
+ } else if (SourceIdsByHeartbeat.begin()->first > Storage.SourceIdsByHeartbeat.begin()->first) {
+ auto storage = Storage.SourceIdsByHeartbeat.begin();
+ auto diff = SourceIdsByHeartbeat.begin();
+
+ TMaybe<TRowVersion> newVersion;
+ while (storage != Storage.SourceIdsByHeartbeat.end()) {
+ const auto& [version, sourceIds] = *storage;
+
+ auto rest = sourceIds.size();
+ for (const auto& sourceId : sourceIds) {
+ auto it = Heartbeats.find(sourceId);
+ if (it != Heartbeats.end() && it->second.Version > version && version <= diff->first) {
+ --rest;
+ } else {
+ break;
+ }
+ }
- if (LastHeartbeats.contains(someSourceId)) {
- return LastHeartbeats.at(someSourceId);
- } else if (Storage.InMemorySourceIds.contains(someSourceId)) {
- return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
+ if (!rest) {
+ if (++storage != Storage.SourceIdsByHeartbeat.end()) {
+ newVersion = storage->first;
+ } else {
+ newVersion = diff->first;
+ }
+ } else {
+ break;
+ }
+ }
+
+ if (newVersion) {
+ storage = Storage.SourceIdsByHeartbeat.find(*newVersion);
+ if (storage != Storage.SourceIdsByHeartbeat.end()) {
+ return GetFromStorage(storage);
+ } else {
+ return GetFromDiff(diff);
+ }
}
}
return Nothing();
}
+TMaybe<THeartbeat> THeartbeatEmitter::GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const {
+ Y_ABORT_UNLESS(!it->second.empty());
+ const auto& someSourceId = *it->second.begin();
+
+ Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(someSourceId));
+ return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
+}
+
+TMaybe<THeartbeat> THeartbeatEmitter::GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const {
+ Y_ABORT_UNLESS(!it->second.empty());
+ const auto& someSourceId = *it->second.begin();
+
+ Y_ABORT_UNLESS(Heartbeats.contains(someSourceId));
+ return Heartbeats.at(someSourceId);
+}
+
}
diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h
index 9ddf494f381..897014363f2 100644
--- a/ydb/core/persqueue/sourceid.h
+++ b/ydb/core/persqueue/sourceid.h
@@ -59,19 +59,17 @@ struct TSourceIdInfo {
}; // TSourceIdInfo
class THeartbeatProcessor {
-public:
- THeartbeatProcessor() = default;
- explicit THeartbeatProcessor(
- const THashSet<TString>& sourceIdsWithHeartbeat,
- const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat);
+protected:
+ using TSourceIdsByHeartbeat = TMap<TRowVersion, THashSet<TString>>;
+public:
void ApplyHeartbeat(const TString& sourceId, const TRowVersion& version);
void ForgetHeartbeat(const TString& sourceId, const TRowVersion& version);
void ForgetSourceId(const TString& sourceId);
protected:
THashSet<TString> SourceIdsWithHeartbeat;
- TMap<TRowVersion, THashSet<TString>> SourceIdsByHeartbeat;
+ TSourceIdsByHeartbeat SourceIdsByHeartbeat;
}; // THeartbeatProcessor
@@ -151,12 +149,17 @@ class THeartbeatEmitter: private THeartbeatProcessor {
public:
explicit THeartbeatEmitter(const TSourceIdStorage& storage);
- void Process(const TString& sourceId, const THeartbeat& heartbeat);
+ void Process(const TString& sourceId, THeartbeat&& heartbeat);
TMaybe<THeartbeat> CanEmit() const;
private:
+ TMaybe<THeartbeat> GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const;
+ TMaybe<THeartbeat> GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const;
+
+private:
const TSourceIdStorage& Storage;
- THashMap<TString, THeartbeat> LastHeartbeats;
+ THashSet<TString> NewSourceIdsWithHeartbeat;
+ THashMap<TString, THeartbeat> Heartbeats;
}; // THeartbeatEmitter
diff --git a/ydb/core/persqueue/ut/sourceid_ut.cpp b/ydb/core/persqueue/ut/sourceid_ut.cpp
index 07a0a3944bb..43164c888c5 100644
--- a/ydb/core/persqueue/ut/sourceid_ut.cpp
+++ b/ydb/core/persqueue/ut/sourceid_ut.cpp
@@ -323,6 +323,143 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
}
}
+ inline static TSourceIdInfo MakeExplicitSourceIdInfo(ui64 offset, const TMaybe<THeartbeat>& heartbeat = Nothing()) {
+ auto info = TSourceIdInfo(0, offset, TInstant::Now());
+
+ info.Explicit = true;
+ if (heartbeat) {
+ info.LastHeartbeat = heartbeat;
+ }
+
+ return info;
+ }
+
+ inline static THeartbeat MakeHeartbeat(ui64 step) {
+ return THeartbeat{
+ .Version = TRowVersion(step, 0),
+ .Data = "",
+ };
+ }
+
+ Y_UNIT_TEST(HeartbeatEmitter) {
+ TSourceIdStorage storage;
+ ui64 offset = 0;
+
+ // initial info w/o heartbeats
+ for (ui64 i = 1; i <= 2; ++i) {
+ storage.RegisterSourceId(TestSourceId(i), MakeExplicitSourceIdInfo(++offset));
+ }
+ {
+ THeartbeatEmitter emitter(storage);
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(1), MakeHeartbeat(1));
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(1), MakeHeartbeat(2));
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(1));
+ {
+ const auto heartbeat = emitter.CanEmit();
+ UNIT_ASSERT(heartbeat.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(1).Version);
+ }
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(3));
+ {
+ const auto heartbeat = emitter.CanEmit();
+ UNIT_ASSERT(heartbeat.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(2).Version);
+ }
+ }
+
+ // one heartbeat
+ storage.RegisterSourceId(TestSourceId(1), MakeExplicitSourceIdInfo(+offset, MakeHeartbeat(4)));
+ {
+ THeartbeatEmitter emitter(storage);
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(3));
+ {
+ const auto heartbeat = emitter.CanEmit();
+ UNIT_ASSERT(heartbeat.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(3).Version);
+ }
+ }
+ {
+ THeartbeatEmitter emitter(storage);
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(5));
+ {
+ const auto heartbeat = emitter.CanEmit();
+ UNIT_ASSERT(heartbeat.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(4).Version);
+ }
+ }
+
+ // two different heartbeats
+ storage.RegisterSourceId(TestSourceId(2), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(5)));
+ {
+ THeartbeatEmitter emitter(storage);
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(6));
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(1), MakeHeartbeat(5));
+ {
+ const auto heartbeat = emitter.CanEmit();
+ UNIT_ASSERT(heartbeat.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(5).Version);
+ }
+ }
+ {
+ THeartbeatEmitter emitter(storage);
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(6));
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(1), MakeHeartbeat(7));
+ {
+ const auto heartbeat = emitter.CanEmit();
+ UNIT_ASSERT(heartbeat.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(6).Version);
+ }
+ }
+
+ // two same heartbeats
+ storage.RegisterSourceId(TestSourceId(1), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(5)));
+ {
+ THeartbeatEmitter emitter(storage);
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(1), MakeHeartbeat(6));
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(6));
+ {
+ const auto heartbeat = emitter.CanEmit();
+ UNIT_ASSERT(heartbeat.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(6).Version);
+ }
+ }
+
+ // can't roll back
+ {
+ THeartbeatEmitter emitter(storage);
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(1), MakeHeartbeat(4));
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+
+ emitter.Process(TestSourceId(2), MakeHeartbeat(4));
+ UNIT_ASSERT(!emitter.CanEmit().Defined());
+ }
+ }
+
} // TSourceIdTests
} // namespace NKikimr::NPQ