diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-07-17 16:26:27 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-07-17 16:26:27 +0300 |
commit | fed8a4fbdf447f4b6525375a72a575753966ef27 (patch) | |
tree | a2116ef8ab30239afa066dd059d205f602f06ba3 | |
parent | da96ac06774420829265a7de590fad0781dbc073 (diff) | |
download | ydb-fed8a4fbdf447f4b6525375a72a575753966ef27.tar.gz |
Continue emitting heartbeats after split/merge KIKIMR-18159
-rw-r--r-- | ydb/core/tx/datashard/cdc_stream_heartbeat.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_dst.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 18 |
4 files changed, 29 insertions, 1 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp index 41207bc4f7..06e3989959 100644 --- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp +++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp @@ -76,6 +76,10 @@ public: }; // TTxCdcStreamEmitHeartbeats void TDataShard::EmitHeartbeats(const TActorContext& ctx) { + if (State != TShardState::Ready) { + return; + } + const auto lowest = CdcStreamHeartbeatManager.LowestVersion(); if (lowest.IsMax()) { return; diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp index 2d33d53ee3..a705349a75 100644 --- a/ydb/core/tx/datashard/datashard_split_dst.cpp +++ b/ydb/core/tx/datashard/datashard_split_dst.cpp @@ -55,6 +55,12 @@ public: const ui64 txId = Ev->Get()->Record.GetOperationCookie(); Self->AddSchemaSnapshot(tableId, info->GetTableSchemaVersion(), 0, txId, txc, ctx); } + + for (const auto& [streamId, stream] : info->CdcStreams) { + if (const auto heartbeatInterval = stream.ResolvedTimestampsInterval) { + Self->GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, tableId, streamId, heartbeatInterval); + } + } } Self->DstSplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>(Ev->Get()->Record.GetSplitDescription()); diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 476d4214fb..aa74750b9a 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -295,6 +295,7 @@ struct TUserTable : public TThrRefBase { EFormat Format; EState State; bool VirtualTimestamps = false; + TDuration ResolvedTimestampsInterval; TMaybe<TString> AwsRegion; TCdcStream() = default; @@ -305,6 +306,7 @@ struct TUserTable : public TThrRefBase { , Format(streamDesc.GetFormat()) , State(streamDesc.GetState()) , VirtualTimestamps(streamDesc.GetVirtualTimestamps()) + , ResolvedTimestampsInterval(TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs())) { if (const auto& awsRegion = streamDesc.GetAwsRegion()) { AwsRegion = awsRegion; diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index c8eab0b722..a6f4b7e9ba 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2716,7 +2716,7 @@ Y_UNIT_TEST_SUITE(Cdc) { CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", - WithResolvedTimestamps(TDuration::Seconds(5), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); WaitForContent(server, edgeActor, "/Root/Table/Stream", { R"({"resolved":"***"})", @@ -2736,6 +2736,22 @@ Y_UNIT_TEST_SUITE(Cdc) { R"({"update":{"value":30},"key":[3]})", R"({"resolved":"***"})", }); + + const auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + SetSplitMergePartCountLimit(&runtime, -1); + WaitTxNotification(server, edgeActor, AsyncSplitTable(server, edgeActor, "/Root/Table", tabletIds.at(0), 2)); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + R"({"resolved":"***"})", + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); } } // Cdc |