aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-07-17 16:26:27 +0300
committerilnaz <ilnaz@ydb.tech>2023-07-17 16:26:27 +0300
commitfed8a4fbdf447f4b6525375a72a575753966ef27 (patch)
treea2116ef8ab30239afa066dd059d205f602f06ba3
parentda96ac06774420829265a7de590fad0781dbc073 (diff)
downloadydb-fed8a4fbdf447f4b6525375a72a575753966ef27.tar.gz
Continue emitting heartbeats after split/merge KIKIMR-18159
-rw-r--r--ydb/core/tx/datashard/cdc_stream_heartbeat.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_split_dst.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp18
4 files changed, 29 insertions, 1 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
index 41207bc4f7..06e3989959 100644
--- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
@@ -76,6 +76,10 @@ public:
}; // TTxCdcStreamEmitHeartbeats
void TDataShard::EmitHeartbeats(const TActorContext& ctx) {
+ if (State != TShardState::Ready) {
+ return;
+ }
+
const auto lowest = CdcStreamHeartbeatManager.LowestVersion();
if (lowest.IsMax()) {
return;
diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp
index 2d33d53ee3..a705349a75 100644
--- a/ydb/core/tx/datashard/datashard_split_dst.cpp
+++ b/ydb/core/tx/datashard/datashard_split_dst.cpp
@@ -55,6 +55,12 @@ public:
const ui64 txId = Ev->Get()->Record.GetOperationCookie();
Self->AddSchemaSnapshot(tableId, info->GetTableSchemaVersion(), 0, txId, txc, ctx);
}
+
+ for (const auto& [streamId, stream] : info->CdcStreams) {
+ if (const auto heartbeatInterval = stream.ResolvedTimestampsInterval) {
+ Self->GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, tableId, streamId, heartbeatInterval);
+ }
+ }
}
Self->DstSplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>(Ev->Get()->Record.GetSplitDescription());
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 476d4214fb..aa74750b9a 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -295,6 +295,7 @@ struct TUserTable : public TThrRefBase {
EFormat Format;
EState State;
bool VirtualTimestamps = false;
+ TDuration ResolvedTimestampsInterval;
TMaybe<TString> AwsRegion;
TCdcStream() = default;
@@ -305,6 +306,7 @@ struct TUserTable : public TThrRefBase {
, Format(streamDesc.GetFormat())
, State(streamDesc.GetState())
, VirtualTimestamps(streamDesc.GetVirtualTimestamps())
+ , ResolvedTimestampsInterval(TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs()))
{
if (const auto& awsRegion = streamDesc.GetAwsRegion()) {
AwsRegion = awsRegion;
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index c8eab0b722..a6f4b7e9ba 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2716,7 +2716,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
- WithResolvedTimestamps(TDuration::Seconds(5), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+ WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
@@ -2736,6 +2736,22 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"update":{"value":30},"key":[3]})",
R"({"resolved":"***"})",
});
+
+ const auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table");
+ UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
+
+ SetSplitMergePartCountLimit(&runtime, -1);
+ WaitTxNotification(server, edgeActor, AsyncSplitTable(server, edgeActor, "/Root/Table", tabletIds.at(0), 2));
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"resolved":"***"})",
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ R"({"resolved":"***"})",
+ R"({"resolved":"***"})",
+ R"({"resolved":"***"})",
+ });
}
} // Cdc