diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-09-20 21:40:37 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-09-20 21:54:33 +0300 |
commit | bcf08f410f67d673ba69f09767cf07c2d88b7f1c (patch) | |
tree | 0370611257b183acbf87a901d806e5ffa00b1c28 | |
parent | 75a840cd0de3c92a4c7b502f3d5961a0cf2e91e6 (diff) | |
download | ydb-bcf08f410f67d673ba69f09767cf07c2d88b7f1c.tar.gz |
Heartbeats aggregation KIKIMR-18159
21 files changed, 360 insertions, 39 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt index df9a5e8e9b5..e15e17c61c1 100644 --- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt @@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index 0526d277980..35c8a463994 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -51,6 +51,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt index 0526d277980..35c8a463994 100644 --- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt @@ -51,6 +51,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt index df9a5e8e9b5..e15e17c61c1 100644 --- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt @@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index a12b0e00d79..0ec89e64a65 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -2,11 +2,13 @@ #include "global.h" -#include <ydb/library/persqueue/topic_parser/topic_parser.h> +#include <ydb/core/base/row_version.h> #include <ydb/core/protos/pqconfig.pb.h> -#include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/persqueue/key.h> #include <ydb/core/persqueue/metering_sink.h> +#include <ydb/core/tablet/tablet_counters.h> +#include <ydb/library/persqueue/topic_parser/topic_parser.h> + #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/core/actorid.h> @@ -161,6 +163,8 @@ struct TEvPQ { TString ExplicitHashKey; bool External; bool IgnoreQuotaDeadline; + // If specified, Data will contain heartbeat's data + std::optional<TRowVersion> HeartbeatVersion; }; TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite) diff --git a/ydb/core/persqueue/heartbeat.cpp b/ydb/core/persqueue/heartbeat.cpp new file mode 100644 index 00000000000..bb78daf08f0 --- /dev/null +++ b/ydb/core/persqueue/heartbeat.cpp @@ -0,0 +1,20 @@ +#include "heartbeat.h" + +#include <ydb/core/protos/pqconfig.pb.h> + +namespace NKikimr::NPQ { + +THeartbeat THeartbeat::Parse(const NKikimrPQ::THeartbeat& proto) { + return THeartbeat{ + .Version = TRowVersion(proto.GetStep(), proto.GetTxId()), + .Data = proto.GetData(), + }; +} + +void THeartbeat::Serialize(NKikimrPQ::THeartbeat& proto) const { + proto.SetStep(Version.Step); + proto.SetTxId(Version.TxId); + proto.SetData(Data); +} + +} diff --git a/ydb/core/persqueue/heartbeat.h b/ydb/core/persqueue/heartbeat.h new file mode 100644 index 00000000000..3363539f0a8 --- /dev/null +++ b/ydb/core/persqueue/heartbeat.h @@ -0,0 +1,21 @@ +#pragma once + +#include <ydb/core/base/row_version.h> + +#include <util/generic/string.h> + +namespace NKikimrPQ { + class THeartbeat; +} + +namespace NKikimr::NPQ { + +struct THeartbeat { + TRowVersion Version; + TString Data; + + static THeartbeat Parse(const NKikimrPQ::THeartbeat& proto); + void Serialize(NKikimrPQ::THeartbeat& proto) const; +}; + +} diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 1c1a5610de5..85f404cf884 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -198,7 +198,8 @@ private: void UpdateWriteBufferIsFullState(const TInstant& now); TInstant GetWriteTimeEstimate(ui64 offset) const; - bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter); + bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, + TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter); bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx); bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx); bool IsQuotingEnabled() const; diff --git a/ydb/core/persqueue/partition_monitoring.cpp b/ydb/core/persqueue/partition_monitoring.cpp index 0348762e2b5..aa8323861c8 100644 --- a/ydb/core/persqueue/partition_monitoring.cpp +++ b/ydb/core/persqueue/partition_monitoring.cpp @@ -201,6 +201,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo TABLEH() {out << "CreateTimestamp";} TABLEH() {out << "Explicit";} TABLEH() {out << "State";} + TABLEH() {out << "LastHeartbeat";} } } TABLEBODY() { @@ -213,6 +214,11 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo TABLED() {out << ToStringLocalTimeUpToSeconds(sourceIdInfo.CreateTimestamp);} TABLED() {out << (sourceIdInfo.Explicit ? "true" : "false");} TABLED() {out << sourceIdInfo.State;} + if (const auto& hb = sourceIdInfo.LastHeartbeat) { + TABLED() {out << hb->Version;} + } else { + TABLED() {out << "null";} + } } } } diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 1fe69aa2623..63a69d0713d 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -9,6 +9,7 @@ #include <ydb/core/base/counters.h> #include <ydb/core/base/path.h> #include <ydb/core/quoter/public/quoter.h> +#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/protos/counters_pq.pb.h> #include <ydb/core/protos/msgbus.pb.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> @@ -306,10 +307,15 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { } if (!already && partNo + 1 == totalParts) { if (it == SourceIdStorage.GetInMemorySourceIds().end()) { + Y_VERIFY(!writeResponse.Msg.HeartbeatVersion); TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); - SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp); + SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp); + } else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) { + SourceIdStorage.RegisterSourceId(s, it->second.Updated( + seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data} + )); } else { - SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp)); + SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, offset, CurrentTimestamp)); } TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1); @@ -332,7 +338,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { PartitionWriteQuotaWaitCounter->IncFor(quotedTime.MilliSeconds()); } - if (!already && partNo + 1 == totalParts) + if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion) ++offset; } else if (response.IsOwnership()) { const TString& ownerCookie = response.GetOwnership().OwnerCookie; @@ -795,7 +801,8 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T } bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, - TSourceIdWriter& sourceIdWriter) { + TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter) +{ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition); ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset; @@ -899,6 +906,39 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const continue; } + if (const auto& hbVersion = p.Msg.HeartbeatVersion) { + if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) { + CancelAllWritesOnWrite(ctx, request, TStringBuilder() + << "Cannot apply heartbeat on unknown sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter); + return false; + } + if (!it_inMemory->second.Explicit) { + CancelAllWritesOnWrite(ctx, request, TStringBuilder() + << "Cannot apply heartbeat on implcit sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter); + return false; + } + + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicName() << "' partition " << Partition + << " process heartbeat sourceId '" << EscapeC(p.Msg.SourceId) << "'" + << " version " << *hbVersion + ); + + auto heartbeat = THeartbeat{ + .Version = *hbVersion, + .Data = p.Msg.Data, + }; + + heartbeatEmitter.Process(p.Msg.SourceId, heartbeat); + sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated( + p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat) + )); + + EmplaceResponse(std::move(pp), ctx); + continue; + } + if (poffset < curOffset) { //too small offset CancelAllWritesOnWrite(ctx, request, TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo @@ -1296,12 +1336,14 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c Y_VERIFY(request->Record.CmdWriteSize() == 0); Y_VERIFY(request->Record.CmdRenameSize() == 0); Y_VERIFY(request->Record.CmdDeleteRangeSize() == 0); + const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo() ? ESourceIdFormat::Proto : ESourceIdFormat::Raw; TSourceIdWriter sourceIdWriter(format); + THeartbeatEmitter heartbeatEmitter(SourceIdStorage); - bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter); + bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter, heartbeatEmitter); if (headCleared) { Y_VERIFY(!CompactedKeys.empty() || Head.PackedSize == 0); @@ -1310,6 +1352,34 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c } } + if (const auto heartbeat = heartbeatEmitter.CanEmit()) { + LOG_INFO_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicName() << "' partition " << Partition + << " emit heartbeat " << heartbeat->Version + ); + + EmplaceRequest(TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{ + .SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)), + .SeqNo = 0, + .PartNo = 0, + .TotalParts = 1, + .TotalSize = static_cast<ui32>(heartbeat->Data.size()), + .CreateTimestamp = CurrentTimestamp.MilliSeconds(), + .ReceiveTimestamp = CurrentTimestamp.MilliSeconds(), + .DisableDeduplication = true, + .WriteTimestamp = CurrentTimestamp.MilliSeconds(), + .Data = heartbeat->Data, + .UncompressedSize = 0, + .PartitionKey = {}, + .ExplicitHashKey = {}, + .External = false, + .IgnoreQuotaDeadline = true, + .HeartbeatVersion = std::nullopt, + }}, ctx); + WriteInflightSize += heartbeat->Data.size(); + } + if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed if (sourceIdWriter.GetSourceIdsToWrite().empty()) { return request->Record.CmdWriteSize() > 0 diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 6503e3b4e09..236b6ebac3f 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -30,6 +30,7 @@ static constexpr char TMP_REQUEST_MARKER[] = "__TMP__REQUEST__MARKER__"; static constexpr ui32 CACHE_SIZE = 100_MB; static constexpr ui32 MAX_BYTES = 25_MB; static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048; +static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB; struct TPartitionInfo { TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange, @@ -1757,7 +1758,9 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p TString errorStr = ""; if (!cmd.HasSeqNo() && !req.GetIsDirectWrite()) { errorStr = "no SeqNo"; - } else if (!cmd.HasData() || cmd.GetData().empty()){ + } else if (cmd.HasData() && cmd.HasHeartbeat()) { + errorStr = "Data and Heartbeat are mutually exclusive"; + } else if (cmd.GetData().empty() && cmd.GetHeartbeat().GetData().empty()) { errorStr = "empty Data"; } else if ((!cmd.HasSourceId() || cmd.GetSourceId().empty()) && !req.GetIsDirectWrite() && !cmd.GetDisableDeduplication()) { errorStr = "empty SourceId"; @@ -1779,6 +1782,10 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p errorStr = "Too big SourceId"; } else if (mirroredPartition && !cmd.GetDisableDeduplication()) { errorStr = "Write to mirrored topic is forbiden"; + } else if (cmd.HasHeartbeat() && cmd.GetHeartbeat().GetData().size() > MAX_HEARTBEAT_SIZE) { + errorStr = "Too big Heartbeat"; + } else if (cmd.HasHeartbeat() && cmd.HasTotalParts() && cmd.GetTotalParts() != 1) { + errorStr = "Heartbeat must be a single-part message"; } ui64 createTimestampMs = 0, writeTimestampMs = 0; if (cmd.HasCreateTimeMS() && cmd.GetCreateTimeMS() >= 0) @@ -1798,6 +1805,12 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p Y_VERIFY(mSize > 204800); ui64 receiveTimestampMs = TAppData::TimeProvider->Now().MilliSeconds(); bool disableDeduplication = cmd.GetDisableDeduplication(); + + std::optional<TRowVersion> heartbeatVersion; + if (cmd.HasHeartbeat()) { + heartbeatVersion.emplace(cmd.GetHeartbeat().GetStep(), cmd.GetHeartbeat().GetTxId()); + } + if (cmd.GetData().size() > mSize) { if (cmd.HasPartNo()) { ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, @@ -1826,7 +1839,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p totalParts, totalSize, createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, data, uncompressedSize, cmd.GetPartitionKey(), cmd.GetExplicitHash(), cmd.GetExternalOperation(), - cmd.GetIgnoreQuotaDeadline() + cmd.GetIgnoreQuotaDeadline(), heartbeatVersion }); partNo++; uncompressedSize = 0; @@ -1840,13 +1853,29 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p ); } Y_VERIFY(partNo == totalParts); + } else if (cmd.GetHeartbeat().GetData().size() > mSize) { + Y_VERIFY_DEBUG(false, "This should never happen"); + ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() + << "Too big heartbeat message, must be at most " << mSize << ", but got " << cmd.GetHeartbeat().GetData().size()); + return; } else { + ui32 totalSize = cmd.GetData().Size(); + if (cmd.HasHeartbeat()) { + totalSize = cmd.GetHeartbeat().GetData().Size(); + } + if (cmd.HasTotalSize()) { + totalSize = cmd.GetTotalSize(); + } + + const auto& data = cmd.HasHeartbeat() + ? cmd.GetHeartbeat().GetData() + : cmd.GetData(); + msgs.push_back({cmd.GetSourceId(), static_cast<ui64>(cmd.GetSeqNo()), static_cast<ui16>(cmd.HasPartNo() ? cmd.GetPartNo() : 0), - static_cast<ui16>(cmd.HasPartNo() ? cmd.GetTotalParts() : 1), - static_cast<ui32>(cmd.HasTotalSize() ? cmd.GetTotalSize() : cmd.GetData().Size()), - createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, cmd.GetData(), + static_cast<ui16>(cmd.HasPartNo() ? cmd.GetTotalParts() : 1), totalSize, + createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, data, cmd.HasUncompressedSize() ? cmd.GetUncompressedSize() : 0u, cmd.GetPartitionKey(), cmd.GetExplicitHash(), - cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline() + cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline(), heartbeatVersion }); } LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index 2eb8ea0a40f..1a0c1528f05 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -7,8 +7,7 @@ #include <algorithm> -namespace NKikimr { -namespace NPQ { +namespace NKikimr::NPQ { static constexpr ui64 MAX_DELETE_COMMAND_SIZE = 10_MB; static constexpr ui64 MAX_DELETE_COMMAND_COUNT = 1000; @@ -80,6 +79,34 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark, void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) { FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd); } +THeartbeatProcessor::THeartbeatProcessor( + const THashSet<TString>& sourceIdsWithHeartbeat, + const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat) + : SourceIdsWithHeartbeat(sourceIdsWithHeartbeat) + , SourceIdsByHeartbeat(sourceIdsByHeartbeat) +{ +} + +void THeartbeatProcessor::ApplyHeartbeat(const TString& sourceId, const TRowVersion& version) { + SourceIdsWithHeartbeat.insert(sourceId); + SourceIdsByHeartbeat[version].insert(sourceId); +} + +void THeartbeatProcessor::ForgetHeartbeat(const TString& sourceId, const TRowVersion& version) { + auto it = SourceIdsByHeartbeat.find(version); + if (it == SourceIdsByHeartbeat.end()) { + return; + } + + it->second.erase(sourceId); + if (it->second.empty()) { + SourceIdsByHeartbeat.erase(it); + } +} + +void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) { + SourceIdsWithHeartbeat.erase(sourceId); +} TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs) : SeqNo(seqNo) @@ -110,6 +137,13 @@ TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) return copy; } +TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const { + auto copy = Updated(seqNo, offset, writeTs); + copy.LastHeartbeat = std::move(heartbeat); + + return copy; +} + TSourceIdInfo TSourceIdInfo::Parse(const TString& data, TInstant now) { Y_VERIFY(data.size() >= 2 * sizeof(ui64), "Data must contain at least SeqNo & Offset"); ui32 pos = 0; @@ -152,6 +186,9 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) { if (proto.HasKeyRange()) { result.KeyRange = TPartitionKeyRange::Parse(proto.GetKeyRange()); } + if (proto.HasLastHeartbeat()) { + result.LastHeartbeat = THeartbeat::Parse(proto.GetLastHeartbeat()); + } return result; } @@ -168,6 +205,9 @@ void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const { if (KeyRange) { KeyRange->Serialize(*proto.MutableKeyRange()); } + if (LastHeartbeat) { + LastHeartbeat->Serialize(*proto.MutableLastHeartbeat()); + } } bool TSourceIdInfo::operator==(const TSourceIdInfo& rhs) const { @@ -202,6 +242,15 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) { return; } + ForgetSourceId(sourceId); + if (const auto& heartbeat = it->second.LastHeartbeat) { + ForgetHeartbeat(sourceId, heartbeat->Version); + } + + if (it->second.Explicit) { + ExplicitSourceIds.erase(sourceId); + } + SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId)); InMemorySourceIds.erase(it); @@ -328,11 +377,24 @@ void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdIn } } - const auto offset = sourceIdInfo.Offset; - InMemorySourceIds[sourceId] = std::move(sourceIdInfo); - - const bool res = SourceIdsByOffset.emplace(offset, sourceId).second; + const bool res = SourceIdsByOffset.emplace(sourceIdInfo.Offset, sourceId).second; Y_VERIFY(res); + + if (sourceIdInfo.Explicit) { + ExplicitSourceIds.insert(sourceId); + } + + if (const auto& heartbeat = sourceIdInfo.LastHeartbeat) { + Y_VERIFY(sourceIdInfo.Explicit); + + if (it != InMemorySourceIds.end() && it->second.LastHeartbeat) { + ForgetHeartbeat(sourceId, it->second.LastHeartbeat->Version); + } + + ApplyHeartbeat(sourceId, heartbeat->Version); + } + + InMemorySourceIds[sourceId] = std::move(sourceIdInfo); } void TSourceIdStorage::RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie) { @@ -428,5 +490,51 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti } } -} // NPQ -} // NKikimr +/// THeartbeatEmitter +THeartbeatEmitter::THeartbeatEmitter(const TSourceIdStorage& storage) + : THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat) + , Storage(storage) +{ +} + +void THeartbeatEmitter::Process(const TString& sourceId, const THeartbeat& heartbeat) { + Y_VERIFY(Storage.InMemorySourceIds.contains(sourceId)); + const auto& sourceIdInfo = Storage.InMemorySourceIds.at(sourceId); + + if (const auto& lastHeartbeat = sourceIdInfo.LastHeartbeat) { + ForgetHeartbeat(sourceId, lastHeartbeat->Version); + } + + if (LastHeartbeats.contains(sourceId)) { + ForgetHeartbeat(sourceId, LastHeartbeats.at(sourceId).Version); + } + + ApplyHeartbeat(sourceId, heartbeat.Version); + LastHeartbeats[sourceId] = heartbeat; +} + +TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const { + if (SourceIdsWithHeartbeat.size() != Storage.ExplicitSourceIds.size()) { + return Nothing(); + } + + if (SourceIdsByHeartbeat.empty()) { + return Nothing(); + } + + auto it = SourceIdsByHeartbeat.begin(); + if (Storage.SourceIdsByHeartbeat.empty() || it->first > Storage.SourceIdsByHeartbeat.begin()->first) { + Y_VERIFY(!it->second.empty()); + const auto& someSourceId = *it->second.begin(); + + if (LastHeartbeats.contains(someSourceId)) { + return LastHeartbeats.at(someSourceId); + } else if (Storage.InMemorySourceIds.contains(someSourceId)) { + return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat; + } + } + + return Nothing(); +} + +} diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 62b9c2834e2..1c6cfd34bf8 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/keyvalue/keyvalue_events.h> +#include <ydb/core/persqueue/heartbeat.h> #include <ydb/core/persqueue/key.h> #include <ydb/core/persqueue/ownerinfo.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> @@ -8,8 +9,7 @@ #include <util/generic/set.h> -namespace NKikimr { -namespace NPQ { +namespace NKikimr::NPQ { enum class ESourceIdFormat: ui8 { Raw = 0, @@ -29,6 +29,7 @@ struct TSourceIdInfo { TInstant CreateTimestamp; bool Explicit = false; TMaybe<TPartitionKeyRange> KeyRange; + TMaybe<THeartbeat> LastHeartbeat; EState State = EState::Registered; TSourceIdInfo() = default; @@ -36,6 +37,7 @@ struct TSourceIdInfo { TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false); TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const; + TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const; static EState ConvertState(NKikimrPQ::TMessageGroupInfo::EState value); static NKikimrPQ::TMessageGroupInfo::EState ConvertState(EState value); @@ -55,9 +57,29 @@ struct TSourceIdInfo { }; // TSourceIdInfo +class THeartbeatProcessor { +public: + THeartbeatProcessor() = default; + explicit THeartbeatProcessor( + const THashSet<TString>& sourceIdsWithHeartbeat, + const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat); + + void ApplyHeartbeat(const TString& sourceId, const TRowVersion& version); + void ForgetHeartbeat(const TString& sourceId, const TRowVersion& version); + void ForgetSourceId(const TString& sourceId); + +protected: + THashSet<TString> SourceIdsWithHeartbeat; + TMap<TRowVersion, THashSet<TString>> SourceIdsByHeartbeat; + +}; // THeartbeatProcessor + using TSourceIdMap = THashMap<TString, TSourceIdInfo>; +class THeartbeatEmitter; + +class TSourceIdStorage: private THeartbeatProcessor { + friend class THeartbeatEmitter; -class TSourceIdStorage { public: const TSourceIdMap& GetInMemorySourceIds() const { return InMemorySourceIds; @@ -88,6 +110,8 @@ private: THashMap<TString, TString> SourceIdOwners; TVector<TString> OwnersToDrop; TSet<std::pair<ui64, TString>> SourceIdsByOffset; + // used to track heartbeats + THashSet<TString> ExplicitSourceIds; }; // TSourceIdStorage @@ -122,8 +146,20 @@ private: }; // TSourceIdWriter -} // NPQ -} // NKikimr +class THeartbeatEmitter: private THeartbeatProcessor { +public: + explicit THeartbeatEmitter(const TSourceIdStorage& storage); + + void Process(const TString& sourceId, const THeartbeat& heartbeat); + TMaybe<THeartbeat> CanEmit() const; + +private: + const TSourceIdStorage& Storage; + THashMap<TString, THeartbeat> LastHeartbeats; + +}; // THeartbeatEmitter + +} Y_DECLARE_OUT_SPEC(inline, NKikimr::NPQ::TSourceIdInfo, out, value) { return value.Out(out); diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index 5b22490e45c..435108c3276 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -7,6 +7,7 @@ SRCS( event_helpers.cpp fetch_request_actor.cpp header.cpp + heartbeat.cpp metering_sink.cpp mirrorer.cpp mirrorer.h diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index d3d09f1b3f1..ce29023c222 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -74,6 +74,8 @@ message TPersQueuePartitionRequest { // Do not reject request when quota exceeded optional bool IgnoreQuotaDeadline = 16 [ default = false ]; + + optional NKikimrPQ.THeartbeat Heartbeat = 17; // mutually exclusive with Data & related fields } message TCmdUpdateWriteTimestamp { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 84b21954cb2..a67b5a4c276 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -359,6 +359,12 @@ message TPQTabletConfig { optional TPartitionStrategy PartitionStrategy = 35; } +message THeartbeat { + optional uint64 Step = 1; + optional uint64 TxId = 2; + optional bytes Data = 3; +} + message TMessageGroupInfo { enum EState { STATE_UNKNOWN = 0; @@ -373,6 +379,7 @@ message TMessageGroupInfo { optional bool Explicit = 5; optional TPartitionKeyRange KeyRange = 6; optional EState State = 7; + optional THeartbeat LastHeartbeat = 8; } message TBootstrapConfig { diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp index 06e39899592..21da0859792 100644 --- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp +++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp @@ -33,8 +33,9 @@ public: TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_EMIT_HEARTBEATS; } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - LOG_I("Emit change records till version" - << ": " << Edge); + LOG_I("Emit change records" + << ": edge# " << Edge + << ", at tablet# " << Self->TabletID()); NIceDb::TNiceDb db(txc.DB); const auto heartbeats = Self->GetCdcStreamHeartbeatManager().EmitHeartbeats(txc.DB, Edge); @@ -68,7 +69,8 @@ public: } void Complete(const TActorContext& ctx) override { - LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"); + LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)" + << ": at tablet# " << Self->TabletID()); Self->EnqueueChangeRecords(std::move(ChangeRecords)); Self->EmitHeartbeats(ctx); } @@ -76,6 +78,9 @@ public: }; // TTxCdcStreamEmitHeartbeats void TDataShard::EmitHeartbeats(const TActorContext& ctx) { + LOG_D("Emit heartbeats" + << ": at tablet# " << TabletID()); + if (State != TShardState::Ready) { return; } diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 12a3dc815eb..c053694b59b 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -390,10 +390,7 @@ TString TChangeRecord::GetPartitionKey() const { break; } - case EKind::CdcHeartbeat: { - return {}; // not used - } - + case EKind::CdcHeartbeat: case EKind::AsyncIndex: { Y_FAIL("Not supported"); } diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 89341a2fe9b..2cbb316f66e 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -107,6 +107,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti NKikimrChangeExchange::TChangeRecord protoRecord; record.SerializeToProto(protoRecord); data.SetData(protoRecord.SerializeAsString()); + cmd.SetData(data.SerializeAsString()); break; } @@ -124,9 +125,20 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti jsonConfig.ValidateUtf8 = false; jsonConfig.WriteNanAsString = true; WriteJson(&str, &json, jsonConfig); - data.SetData(str.Str()); - cmd.SetPartitionKey(record.GetPartitionKey()); + + if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) { + cmd.SetData(data.SerializeAsString()); + cmd.SetPartitionKey(record.GetPartitionKey()); + } else if (record.GetKind() == TChangeRecord::EKind::CdcHeartbeat) { + auto& heartbeat = *cmd.MutableHeartbeat(); + heartbeat.SetStep(record.GetStep()); + heartbeat.SetTxId(record.GetTxId()); + heartbeat.SetData(data.SerializeAsString()); + } else { + Y_FAIL_S("Unexpected cdc record" + << ": kind# " << record.GetKind()); + } break; } @@ -137,7 +149,6 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti } } - cmd.SetData(data.SerializeAsString()); Pending.push_back(record.GetSeqNo()); } diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index bb726a3b351..98d2e31aaf5 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -114,6 +114,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) { Self->CreateChangeSender(ctx); Self->EnqueueChangeRecords(std::move(ChangeRecords)); Self->MaybeActivateChangeSender(ctx); + Self->EmitHeartbeats(ctx); if (!Self->ChangesQueue) { if (!Self->ChangeExchangeSplitter.Done()) { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 91b1a52b714..2082c049a26 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2751,7 +2751,6 @@ Y_UNIT_TEST_SUITE(Cdc) { R"({"update":{"value":30},"key":[3]})", R"({"resolved":"***"})", R"({"resolved":"***"})", - R"({"resolved":"***"})", }); // disable stream @@ -2765,7 +2764,6 @@ Y_UNIT_TEST_SUITE(Cdc) { R"({"update":{"value":30},"key":[3]})", R"({"resolved":"***"})", R"({"resolved":"***"})", - R"({"resolved":"***"})", }); } |