aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-09-20 21:40:37 +0300
committerilnaz <ilnaz@ydb.tech>2023-09-20 21:54:33 +0300
commitbcf08f410f67d673ba69f09767cf07c2d88b7f1c (patch)
tree0370611257b183acbf87a901d806e5ffa00b1c28
parent75a840cd0de3c92a4c7b502f3d5961a0cf2e91e6 (diff)
downloadydb-bcf08f410f67d673ba69f09767cf07c2d88b7f1c.tar.gz
Heartbeats aggregation KIKIMR-18159
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/persqueue/events/internal.h8
-rw-r--r--ydb/core/persqueue/heartbeat.cpp20
-rw-r--r--ydb/core/persqueue/heartbeat.h21
-rw-r--r--ydb/core/persqueue/partition.h3
-rw-r--r--ydb/core/persqueue/partition_monitoring.cpp6
-rw-r--r--ydb/core/persqueue/partition_write.cpp80
-rw-r--r--ydb/core/persqueue/pq_impl.cpp41
-rw-r--r--ydb/core/persqueue/sourceid.cpp124
-rw-r--r--ydb/core/persqueue/sourceid.h46
-rw-r--r--ydb/core/persqueue/ya.make1
-rw-r--r--ydb/core/protos/msgbus_pq.proto2
-rw-r--r--ydb/core/protos/pqconfig.proto7
-rw-r--r--ydb/core/tx/datashard/cdc_stream_heartbeat.cpp11
-rw-r--r--ydb/core/tx/datashard/change_record.cpp5
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp17
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp2
21 files changed, 360 insertions, 39 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
index df9a5e8e9b5..e15e17c61c1 100644
--- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
@@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index 0526d277980..35c8a463994 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -51,6 +51,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
index 0526d277980..35c8a463994 100644
--- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
@@ -51,6 +51,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
index df9a5e8e9b5..e15e17c61c1 100644
--- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
@@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/event_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/fetch_request_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/header.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/heartbeat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index a12b0e00d79..0ec89e64a65 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -2,11 +2,13 @@
#include "global.h"
-#include <ydb/library/persqueue/topic_parser/topic_parser.h>
+#include <ydb/core/base/row_version.h>
#include <ydb/core/protos/pqconfig.pb.h>
-#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/metering_sink.h>
+#include <ydb/core/tablet/tablet_counters.h>
+#include <ydb/library/persqueue/topic_parser/topic_parser.h>
+
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/actorid.h>
@@ -161,6 +163,8 @@ struct TEvPQ {
TString ExplicitHashKey;
bool External;
bool IgnoreQuotaDeadline;
+ // If specified, Data will contain heartbeat's data
+ std::optional<TRowVersion> HeartbeatVersion;
};
TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite)
diff --git a/ydb/core/persqueue/heartbeat.cpp b/ydb/core/persqueue/heartbeat.cpp
new file mode 100644
index 00000000000..bb78daf08f0
--- /dev/null
+++ b/ydb/core/persqueue/heartbeat.cpp
@@ -0,0 +1,20 @@
+#include "heartbeat.h"
+
+#include <ydb/core/protos/pqconfig.pb.h>
+
+namespace NKikimr::NPQ {
+
+THeartbeat THeartbeat::Parse(const NKikimrPQ::THeartbeat& proto) {
+ return THeartbeat{
+ .Version = TRowVersion(proto.GetStep(), proto.GetTxId()),
+ .Data = proto.GetData(),
+ };
+}
+
+void THeartbeat::Serialize(NKikimrPQ::THeartbeat& proto) const {
+ proto.SetStep(Version.Step);
+ proto.SetTxId(Version.TxId);
+ proto.SetData(Data);
+}
+
+}
diff --git a/ydb/core/persqueue/heartbeat.h b/ydb/core/persqueue/heartbeat.h
new file mode 100644
index 00000000000..3363539f0a8
--- /dev/null
+++ b/ydb/core/persqueue/heartbeat.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <ydb/core/base/row_version.h>
+
+#include <util/generic/string.h>
+
+namespace NKikimrPQ {
+ class THeartbeat;
+}
+
+namespace NKikimr::NPQ {
+
+struct THeartbeat {
+ TRowVersion Version;
+ TString Data;
+
+ static THeartbeat Parse(const NKikimrPQ::THeartbeat& proto);
+ void Serialize(NKikimrPQ::THeartbeat& proto) const;
+};
+
+}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 1c1a5610de5..85f404cf884 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -198,7 +198,8 @@ private:
void UpdateWriteBufferIsFullState(const TInstant& now);
TInstant GetWriteTimeEstimate(ui64 offset) const;
- bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter);
+ bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
+ TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter);
bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx);
bool IsQuotingEnabled() const;
diff --git a/ydb/core/persqueue/partition_monitoring.cpp b/ydb/core/persqueue/partition_monitoring.cpp
index 0348762e2b5..aa8323861c8 100644
--- a/ydb/core/persqueue/partition_monitoring.cpp
+++ b/ydb/core/persqueue/partition_monitoring.cpp
@@ -201,6 +201,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
TABLEH() {out << "CreateTimestamp";}
TABLEH() {out << "Explicit";}
TABLEH() {out << "State";}
+ TABLEH() {out << "LastHeartbeat";}
}
}
TABLEBODY() {
@@ -213,6 +214,11 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
TABLED() {out << ToStringLocalTimeUpToSeconds(sourceIdInfo.CreateTimestamp);}
TABLED() {out << (sourceIdInfo.Explicit ? "true" : "false");}
TABLED() {out << sourceIdInfo.State;}
+ if (const auto& hb = sourceIdInfo.LastHeartbeat) {
+ TABLED() {out << hb->Version;}
+ } else {
+ TABLED() {out << "null";}
+ }
}
}
}
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 1fe69aa2623..63a69d0713d 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -9,6 +9,7 @@
#include <ydb/core/base/counters.h>
#include <ydb/core/base/path.h>
#include <ydb/core/quoter/public/quoter.h>
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/protos/counters_pq.pb.h>
#include <ydb/core/protos/msgbus.pb.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
@@ -306,10 +307,15 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
}
if (!already && partNo + 1 == totalParts) {
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
+ Y_VERIFY(!writeResponse.Msg.HeartbeatVersion);
TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
- SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp);
+ SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp);
+ } else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) {
+ SourceIdStorage.RegisterSourceId(s, it->second.Updated(
+ seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data}
+ ));
} else {
- SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp));
+ SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, offset, CurrentTimestamp));
}
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1);
@@ -332,7 +338,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
PartitionWriteQuotaWaitCounter->IncFor(quotedTime.MilliSeconds());
}
- if (!already && partNo + 1 == totalParts)
+ if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
++offset;
} else if (response.IsOwnership()) {
const TString& ownerCookie = response.GetOwnership().OwnerCookie;
@@ -795,7 +801,8 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T
}
bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
- TSourceIdWriter& sourceIdWriter) {
+ TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter)
+{
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition);
ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset;
@@ -899,6 +906,39 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
continue;
}
+ if (const auto& hbVersion = p.Msg.HeartbeatVersion) {
+ if (it_inMemory == SourceIdStorage.GetInMemorySourceIds().end()) {
+ CancelAllWritesOnWrite(ctx, request, TStringBuilder()
+ << "Cannot apply heartbeat on unknown sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter);
+ return false;
+ }
+ if (!it_inMemory->second.Explicit) {
+ CancelAllWritesOnWrite(ctx, request, TStringBuilder()
+ << "Cannot apply heartbeat on implcit sourceId: " << EscapeC(p.Msg.SourceId), p, sourceIdWriter);
+ return false;
+ }
+
+ LOG_DEBUG_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "Topic '" << TopicName() << "' partition " << Partition
+ << " process heartbeat sourceId '" << EscapeC(p.Msg.SourceId) << "'"
+ << " version " << *hbVersion
+ );
+
+ auto heartbeat = THeartbeat{
+ .Version = *hbVersion,
+ .Data = p.Msg.Data,
+ };
+
+ heartbeatEmitter.Process(p.Msg.SourceId, heartbeat);
+ sourceIdWriter.RegisterSourceId(p.Msg.SourceId, it_inMemory->second.Updated(
+ p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat)
+ ));
+
+ EmplaceResponse(std::move(pp), ctx);
+ continue;
+ }
+
if (poffset < curOffset) { //too small offset
CancelAllWritesOnWrite(ctx, request,
TStringBuilder() << "write message sourceId: " << EscapeC(p.Msg.SourceId) << " seqNo: " << p.Msg.SeqNo
@@ -1296,12 +1336,14 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
Y_VERIFY(request->Record.CmdWriteSize() == 0);
Y_VERIFY(request->Record.CmdRenameSize() == 0);
Y_VERIFY(request->Record.CmdDeleteRangeSize() == 0);
+
const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo()
? ESourceIdFormat::Proto
: ESourceIdFormat::Raw;
TSourceIdWriter sourceIdWriter(format);
+ THeartbeatEmitter heartbeatEmitter(SourceIdStorage);
- bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter);
+ bool headCleared = AppendHeadWithNewWrites(request, ctx, sourceIdWriter, heartbeatEmitter);
if (headCleared) {
Y_VERIFY(!CompactedKeys.empty() || Head.PackedSize == 0);
@@ -1310,6 +1352,34 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
}
}
+ if (const auto heartbeat = heartbeatEmitter.CanEmit()) {
+ LOG_INFO_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "Topic '" << TopicName() << "' partition " << Partition
+ << " emit heartbeat " << heartbeat->Version
+ );
+
+ EmplaceRequest(TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
+ .SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
+ .SeqNo = 0,
+ .PartNo = 0,
+ .TotalParts = 1,
+ .TotalSize = static_cast<ui32>(heartbeat->Data.size()),
+ .CreateTimestamp = CurrentTimestamp.MilliSeconds(),
+ .ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
+ .DisableDeduplication = true,
+ .WriteTimestamp = CurrentTimestamp.MilliSeconds(),
+ .Data = heartbeat->Data,
+ .UncompressedSize = 0,
+ .PartitionKey = {},
+ .ExplicitHashKey = {},
+ .External = false,
+ .IgnoreQuotaDeadline = true,
+ .HeartbeatVersion = std::nullopt,
+ }}, ctx);
+ WriteInflightSize += heartbeat->Data.size();
+ }
+
if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
if (sourceIdWriter.GetSourceIdsToWrite().empty()) {
return request->Record.CmdWriteSize() > 0
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 6503e3b4e09..236b6ebac3f 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -30,6 +30,7 @@ static constexpr char TMP_REQUEST_MARKER[] = "__TMP__REQUEST__MARKER__";
static constexpr ui32 CACHE_SIZE = 100_MB;
static constexpr ui32 MAX_BYTES = 25_MB;
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
+static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
struct TPartitionInfo {
TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange,
@@ -1757,7 +1758,9 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
TString errorStr = "";
if (!cmd.HasSeqNo() && !req.GetIsDirectWrite()) {
errorStr = "no SeqNo";
- } else if (!cmd.HasData() || cmd.GetData().empty()){
+ } else if (cmd.HasData() && cmd.HasHeartbeat()) {
+ errorStr = "Data and Heartbeat are mutually exclusive";
+ } else if (cmd.GetData().empty() && cmd.GetHeartbeat().GetData().empty()) {
errorStr = "empty Data";
} else if ((!cmd.HasSourceId() || cmd.GetSourceId().empty()) && !req.GetIsDirectWrite() && !cmd.GetDisableDeduplication()) {
errorStr = "empty SourceId";
@@ -1779,6 +1782,10 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
errorStr = "Too big SourceId";
} else if (mirroredPartition && !cmd.GetDisableDeduplication()) {
errorStr = "Write to mirrored topic is forbiden";
+ } else if (cmd.HasHeartbeat() && cmd.GetHeartbeat().GetData().size() > MAX_HEARTBEAT_SIZE) {
+ errorStr = "Too big Heartbeat";
+ } else if (cmd.HasHeartbeat() && cmd.HasTotalParts() && cmd.GetTotalParts() != 1) {
+ errorStr = "Heartbeat must be a single-part message";
}
ui64 createTimestampMs = 0, writeTimestampMs = 0;
if (cmd.HasCreateTimeMS() && cmd.GetCreateTimeMS() >= 0)
@@ -1798,6 +1805,12 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
Y_VERIFY(mSize > 204800);
ui64 receiveTimestampMs = TAppData::TimeProvider->Now().MilliSeconds();
bool disableDeduplication = cmd.GetDisableDeduplication();
+
+ std::optional<TRowVersion> heartbeatVersion;
+ if (cmd.HasHeartbeat()) {
+ heartbeatVersion.emplace(cmd.GetHeartbeat().GetStep(), cmd.GetHeartbeat().GetTxId());
+ }
+
if (cmd.GetData().size() > mSize) {
if (cmd.HasPartNo()) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST,
@@ -1826,7 +1839,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
totalParts, totalSize, createTimestampMs, receiveTimestampMs,
disableDeduplication, writeTimestampMs, data, uncompressedSize,
cmd.GetPartitionKey(), cmd.GetExplicitHash(), cmd.GetExternalOperation(),
- cmd.GetIgnoreQuotaDeadline()
+ cmd.GetIgnoreQuotaDeadline(), heartbeatVersion
});
partNo++;
uncompressedSize = 0;
@@ -1840,13 +1853,29 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
);
}
Y_VERIFY(partNo == totalParts);
+ } else if (cmd.GetHeartbeat().GetData().size() > mSize) {
+ Y_VERIFY_DEBUG(false, "This should never happen");
+ ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder()
+ << "Too big heartbeat message, must be at most " << mSize << ", but got " << cmd.GetHeartbeat().GetData().size());
+ return;
} else {
+ ui32 totalSize = cmd.GetData().Size();
+ if (cmd.HasHeartbeat()) {
+ totalSize = cmd.GetHeartbeat().GetData().Size();
+ }
+ if (cmd.HasTotalSize()) {
+ totalSize = cmd.GetTotalSize();
+ }
+
+ const auto& data = cmd.HasHeartbeat()
+ ? cmd.GetHeartbeat().GetData()
+ : cmd.GetData();
+
msgs.push_back({cmd.GetSourceId(), static_cast<ui64>(cmd.GetSeqNo()), static_cast<ui16>(cmd.HasPartNo() ? cmd.GetPartNo() : 0),
- static_cast<ui16>(cmd.HasPartNo() ? cmd.GetTotalParts() : 1),
- static_cast<ui32>(cmd.HasTotalSize() ? cmd.GetTotalSize() : cmd.GetData().Size()),
- createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, cmd.GetData(),
+ static_cast<ui16>(cmd.HasPartNo() ? cmd.GetTotalParts() : 1), totalSize,
+ createTimestampMs, receiveTimestampMs, disableDeduplication, writeTimestampMs, data,
cmd.HasUncompressedSize() ? cmd.GetUncompressedSize() : 0u, cmd.GetPartitionKey(), cmd.GetExplicitHash(),
- cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline()
+ cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline(), heartbeatVersion
});
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp
index 2eb8ea0a40f..1a0c1528f05 100644
--- a/ydb/core/persqueue/sourceid.cpp
+++ b/ydb/core/persqueue/sourceid.cpp
@@ -7,8 +7,7 @@
#include <algorithm>
-namespace NKikimr {
-namespace NPQ {
+namespace NKikimr::NPQ {
static constexpr ui64 MAX_DELETE_COMMAND_SIZE = 10_MB;
static constexpr ui64 MAX_DELETE_COMMAND_COUNT = 1000;
@@ -80,6 +79,34 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark,
void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd);
}
+THeartbeatProcessor::THeartbeatProcessor(
+ const THashSet<TString>& sourceIdsWithHeartbeat,
+ const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat)
+ : SourceIdsWithHeartbeat(sourceIdsWithHeartbeat)
+ , SourceIdsByHeartbeat(sourceIdsByHeartbeat)
+{
+}
+
+void THeartbeatProcessor::ApplyHeartbeat(const TString& sourceId, const TRowVersion& version) {
+ SourceIdsWithHeartbeat.insert(sourceId);
+ SourceIdsByHeartbeat[version].insert(sourceId);
+}
+
+void THeartbeatProcessor::ForgetHeartbeat(const TString& sourceId, const TRowVersion& version) {
+ auto it = SourceIdsByHeartbeat.find(version);
+ if (it == SourceIdsByHeartbeat.end()) {
+ return;
+ }
+
+ it->second.erase(sourceId);
+ if (it->second.empty()) {
+ SourceIdsByHeartbeat.erase(it);
+ }
+}
+
+void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) {
+ SourceIdsWithHeartbeat.erase(sourceId);
+}
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
: SeqNo(seqNo)
@@ -110,6 +137,13 @@ TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs)
return copy;
}
+TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const {
+ auto copy = Updated(seqNo, offset, writeTs);
+ copy.LastHeartbeat = std::move(heartbeat);
+
+ return copy;
+}
+
TSourceIdInfo TSourceIdInfo::Parse(const TString& data, TInstant now) {
Y_VERIFY(data.size() >= 2 * sizeof(ui64), "Data must contain at least SeqNo & Offset");
ui32 pos = 0;
@@ -152,6 +186,9 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
if (proto.HasKeyRange()) {
result.KeyRange = TPartitionKeyRange::Parse(proto.GetKeyRange());
}
+ if (proto.HasLastHeartbeat()) {
+ result.LastHeartbeat = THeartbeat::Parse(proto.GetLastHeartbeat());
+ }
return result;
}
@@ -168,6 +205,9 @@ void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const {
if (KeyRange) {
KeyRange->Serialize(*proto.MutableKeyRange());
}
+ if (LastHeartbeat) {
+ LastHeartbeat->Serialize(*proto.MutableLastHeartbeat());
+ }
}
bool TSourceIdInfo::operator==(const TSourceIdInfo& rhs) const {
@@ -202,6 +242,15 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) {
return;
}
+ ForgetSourceId(sourceId);
+ if (const auto& heartbeat = it->second.LastHeartbeat) {
+ ForgetHeartbeat(sourceId, heartbeat->Version);
+ }
+
+ if (it->second.Explicit) {
+ ExplicitSourceIds.erase(sourceId);
+ }
+
SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId));
InMemorySourceIds.erase(it);
@@ -328,11 +377,24 @@ void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdIn
}
}
- const auto offset = sourceIdInfo.Offset;
- InMemorySourceIds[sourceId] = std::move(sourceIdInfo);
-
- const bool res = SourceIdsByOffset.emplace(offset, sourceId).second;
+ const bool res = SourceIdsByOffset.emplace(sourceIdInfo.Offset, sourceId).second;
Y_VERIFY(res);
+
+ if (sourceIdInfo.Explicit) {
+ ExplicitSourceIds.insert(sourceId);
+ }
+
+ if (const auto& heartbeat = sourceIdInfo.LastHeartbeat) {
+ Y_VERIFY(sourceIdInfo.Explicit);
+
+ if (it != InMemorySourceIds.end() && it->second.LastHeartbeat) {
+ ForgetHeartbeat(sourceId, it->second.LastHeartbeat->Version);
+ }
+
+ ApplyHeartbeat(sourceId, heartbeat->Version);
+ }
+
+ InMemorySourceIds[sourceId] = std::move(sourceIdInfo);
}
void TSourceIdStorage::RegisterSourceIdOwner(const TString& sourceId, const TStringBuf& ownerCookie) {
@@ -428,5 +490,51 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti
}
}
-} // NPQ
-} // NKikimr
+/// THeartbeatEmitter
+THeartbeatEmitter::THeartbeatEmitter(const TSourceIdStorage& storage)
+ : THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat)
+ , Storage(storage)
+{
+}
+
+void THeartbeatEmitter::Process(const TString& sourceId, const THeartbeat& heartbeat) {
+ Y_VERIFY(Storage.InMemorySourceIds.contains(sourceId));
+ const auto& sourceIdInfo = Storage.InMemorySourceIds.at(sourceId);
+
+ if (const auto& lastHeartbeat = sourceIdInfo.LastHeartbeat) {
+ ForgetHeartbeat(sourceId, lastHeartbeat->Version);
+ }
+
+ if (LastHeartbeats.contains(sourceId)) {
+ ForgetHeartbeat(sourceId, LastHeartbeats.at(sourceId).Version);
+ }
+
+ ApplyHeartbeat(sourceId, heartbeat.Version);
+ LastHeartbeats[sourceId] = heartbeat;
+}
+
+TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const {
+ if (SourceIdsWithHeartbeat.size() != Storage.ExplicitSourceIds.size()) {
+ return Nothing();
+ }
+
+ if (SourceIdsByHeartbeat.empty()) {
+ return Nothing();
+ }
+
+ auto it = SourceIdsByHeartbeat.begin();
+ if (Storage.SourceIdsByHeartbeat.empty() || it->first > Storage.SourceIdsByHeartbeat.begin()->first) {
+ Y_VERIFY(!it->second.empty());
+ const auto& someSourceId = *it->second.begin();
+
+ if (LastHeartbeats.contains(someSourceId)) {
+ return LastHeartbeats.at(someSourceId);
+ } else if (Storage.InMemorySourceIds.contains(someSourceId)) {
+ return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
+ }
+ }
+
+ return Nothing();
+}
+
+}
diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h
index 62b9c2834e2..1c6cfd34bf8 100644
--- a/ydb/core/persqueue/sourceid.h
+++ b/ydb/core/persqueue/sourceid.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/keyvalue/keyvalue_events.h>
+#include <ydb/core/persqueue/heartbeat.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/ownerinfo.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
@@ -8,8 +9,7 @@
#include <util/generic/set.h>
-namespace NKikimr {
-namespace NPQ {
+namespace NKikimr::NPQ {
enum class ESourceIdFormat: ui8 {
Raw = 0,
@@ -29,6 +29,7 @@ struct TSourceIdInfo {
TInstant CreateTimestamp;
bool Explicit = false;
TMaybe<TPartitionKeyRange> KeyRange;
+ TMaybe<THeartbeat> LastHeartbeat;
EState State = EState::Registered;
TSourceIdInfo() = default;
@@ -36,6 +37,7 @@ struct TSourceIdInfo {
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const;
+ TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const;
static EState ConvertState(NKikimrPQ::TMessageGroupInfo::EState value);
static NKikimrPQ::TMessageGroupInfo::EState ConvertState(EState value);
@@ -55,9 +57,29 @@ struct TSourceIdInfo {
}; // TSourceIdInfo
+class THeartbeatProcessor {
+public:
+ THeartbeatProcessor() = default;
+ explicit THeartbeatProcessor(
+ const THashSet<TString>& sourceIdsWithHeartbeat,
+ const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat);
+
+ void ApplyHeartbeat(const TString& sourceId, const TRowVersion& version);
+ void ForgetHeartbeat(const TString& sourceId, const TRowVersion& version);
+ void ForgetSourceId(const TString& sourceId);
+
+protected:
+ THashSet<TString> SourceIdsWithHeartbeat;
+ TMap<TRowVersion, THashSet<TString>> SourceIdsByHeartbeat;
+
+}; // THeartbeatProcessor
+
using TSourceIdMap = THashMap<TString, TSourceIdInfo>;
+class THeartbeatEmitter;
+
+class TSourceIdStorage: private THeartbeatProcessor {
+ friend class THeartbeatEmitter;
-class TSourceIdStorage {
public:
const TSourceIdMap& GetInMemorySourceIds() const {
return InMemorySourceIds;
@@ -88,6 +110,8 @@ private:
THashMap<TString, TString> SourceIdOwners;
TVector<TString> OwnersToDrop;
TSet<std::pair<ui64, TString>> SourceIdsByOffset;
+ // used to track heartbeats
+ THashSet<TString> ExplicitSourceIds;
}; // TSourceIdStorage
@@ -122,8 +146,20 @@ private:
}; // TSourceIdWriter
-} // NPQ
-} // NKikimr
+class THeartbeatEmitter: private THeartbeatProcessor {
+public:
+ explicit THeartbeatEmitter(const TSourceIdStorage& storage);
+
+ void Process(const TString& sourceId, const THeartbeat& heartbeat);
+ TMaybe<THeartbeat> CanEmit() const;
+
+private:
+ const TSourceIdStorage& Storage;
+ THashMap<TString, THeartbeat> LastHeartbeats;
+
+}; // THeartbeatEmitter
+
+}
Y_DECLARE_OUT_SPEC(inline, NKikimr::NPQ::TSourceIdInfo, out, value) {
return value.Out(out);
diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make
index 5b22490e45c..435108c3276 100644
--- a/ydb/core/persqueue/ya.make
+++ b/ydb/core/persqueue/ya.make
@@ -7,6 +7,7 @@ SRCS(
event_helpers.cpp
fetch_request_actor.cpp
header.cpp
+ heartbeat.cpp
metering_sink.cpp
mirrorer.cpp
mirrorer.h
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index d3d09f1b3f1..ce29023c222 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -74,6 +74,8 @@ message TPersQueuePartitionRequest {
// Do not reject request when quota exceeded
optional bool IgnoreQuotaDeadline = 16 [ default = false ];
+
+ optional NKikimrPQ.THeartbeat Heartbeat = 17; // mutually exclusive with Data & related fields
}
message TCmdUpdateWriteTimestamp {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 84b21954cb2..a67b5a4c276 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -359,6 +359,12 @@ message TPQTabletConfig {
optional TPartitionStrategy PartitionStrategy = 35;
}
+message THeartbeat {
+ optional uint64 Step = 1;
+ optional uint64 TxId = 2;
+ optional bytes Data = 3;
+}
+
message TMessageGroupInfo {
enum EState {
STATE_UNKNOWN = 0;
@@ -373,6 +379,7 @@ message TMessageGroupInfo {
optional bool Explicit = 5;
optional TPartitionKeyRange KeyRange = 6;
optional EState State = 7;
+ optional THeartbeat LastHeartbeat = 8;
}
message TBootstrapConfig {
diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
index 06e39899592..21da0859792 100644
--- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
@@ -33,8 +33,9 @@ public:
TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_EMIT_HEARTBEATS; }
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
- LOG_I("Emit change records till version"
- << ": " << Edge);
+ LOG_I("Emit change records"
+ << ": edge# " << Edge
+ << ", at tablet# " << Self->TabletID());
NIceDb::TNiceDb db(txc.DB);
const auto heartbeats = Self->GetCdcStreamHeartbeatManager().EmitHeartbeats(txc.DB, Edge);
@@ -68,7 +69,8 @@ public:
}
void Complete(const TActorContext& ctx) override {
- LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)");
+ LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"
+ << ": at tablet# " << Self->TabletID());
Self->EnqueueChangeRecords(std::move(ChangeRecords));
Self->EmitHeartbeats(ctx);
}
@@ -76,6 +78,9 @@ public:
}; // TTxCdcStreamEmitHeartbeats
void TDataShard::EmitHeartbeats(const TActorContext& ctx) {
+ LOG_D("Emit heartbeats"
+ << ": at tablet# " << TabletID());
+
if (State != TShardState::Ready) {
return;
}
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 12a3dc815eb..c053694b59b 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -390,10 +390,7 @@ TString TChangeRecord::GetPartitionKey() const {
break;
}
- case EKind::CdcHeartbeat: {
- return {}; // not used
- }
-
+ case EKind::CdcHeartbeat:
case EKind::AsyncIndex: {
Y_FAIL("Not supported");
}
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 89341a2fe9b..2cbb316f66e 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -107,6 +107,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
NKikimrChangeExchange::TChangeRecord protoRecord;
record.SerializeToProto(protoRecord);
data.SetData(protoRecord.SerializeAsString());
+ cmd.SetData(data.SerializeAsString());
break;
}
@@ -124,9 +125,20 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
jsonConfig.ValidateUtf8 = false;
jsonConfig.WriteNanAsString = true;
WriteJson(&str, &json, jsonConfig);
-
data.SetData(str.Str());
- cmd.SetPartitionKey(record.GetPartitionKey());
+
+ if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) {
+ cmd.SetData(data.SerializeAsString());
+ cmd.SetPartitionKey(record.GetPartitionKey());
+ } else if (record.GetKind() == TChangeRecord::EKind::CdcHeartbeat) {
+ auto& heartbeat = *cmd.MutableHeartbeat();
+ heartbeat.SetStep(record.GetStep());
+ heartbeat.SetTxId(record.GetTxId());
+ heartbeat.SetData(data.SerializeAsString());
+ } else {
+ Y_FAIL_S("Unexpected cdc record"
+ << ": kind# " << record.GetKind());
+ }
break;
}
@@ -137,7 +149,6 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
}
}
- cmd.SetData(data.SerializeAsString());
Pending.push_back(record.GetSeqNo());
}
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index bb726a3b351..98d2e31aaf5 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -114,6 +114,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) {
Self->CreateChangeSender(ctx);
Self->EnqueueChangeRecords(std::move(ChangeRecords));
Self->MaybeActivateChangeSender(ctx);
+ Self->EmitHeartbeats(ctx);
if (!Self->ChangesQueue) {
if (!Self->ChangeExchangeSplitter.Done()) {
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 91b1a52b714..2082c049a26 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2751,7 +2751,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"update":{"value":30},"key":[3]})",
R"({"resolved":"***"})",
R"({"resolved":"***"})",
- R"({"resolved":"***"})",
});
// disable stream
@@ -2765,7 +2764,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"update":{"value":30},"key":[3]})",
R"({"resolved":"***"})",
R"({"resolved":"***"})",
- R"({"resolved":"***"})",
});
}