aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-07-17 18:02:45 +0300
committerilnaz <ilnaz@ydb.tech>2023-07-17 18:02:45 +0300
commit54c2702ef9a5f7fcff23ef380009baf2f13c9737 (patch)
tree9c8b2d47032e736f9f906f2a7dbf5bf868095afa
parent3faf3cf568022f471b9baadf091ea93089ba9be3 (diff)
downloadydb-54c2702ef9a5f7fcff23ef380009baf2f13c9737.tar.gz
Do not emit heartbeats during initial scan KIKIMR-18159
-rw-r--r--ydb/core/tx/datashard/alter_cdc_stream_unit.cpp29
-rw-r--r--ydb/core/tx/datashard/create_cdc_stream_unit.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp58
3 files changed, 80 insertions, 13 deletions
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
index cca1f497ea..bf3baa3e55 100644
--- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
@@ -41,18 +41,24 @@ public:
TUserTable::TPtr tableInfo;
switch (state) {
case NKikimrSchemeOp::ECdcStreamStateDisabled:
+ tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state);
+ break;
+
case NKikimrSchemeOp::ECdcStreamStateReady:
tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state);
- if (state == NKikimrSchemeOp::ECdcStreamStateReady) {
- if (params.HasDropSnapshot()) {
- const auto& snapshot = params.GetDropSnapshot();
- Y_VERIFY(snapshot.GetStep() != 0);
-
- const TSnapshotKey key(pathId, snapshot.GetStep(), snapshot.GetTxId());
- DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key);
- } else {
- Y_VERIFY_DEBUG(false, "Absent snapshot");
- }
+
+ if (params.HasDropSnapshot()) {
+ const auto& snapshot = params.GetDropSnapshot();
+ Y_VERIFY(snapshot.GetStep() != 0);
+
+ const TSnapshotKey key(pathId, snapshot.GetStep(), snapshot.GetTxId());
+ DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key);
+ } else {
+ Y_VERIFY_DEBUG(false, "Absent snapshot");
+ }
+
+ if (const auto heartbeatInterval = TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs())) {
+ DataShard.GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, pathId, streamPathId, heartbeatInterval);
}
break;
@@ -81,7 +87,8 @@ public:
return EExecutionStatus::DelayCompleteNoMoreRestarts;
}
- void Complete(TOperation::TPtr, const TActorContext&) override {
+ void Complete(TOperation::TPtr, const TActorContext& ctx) override {
+ DataShard.EmitHeartbeats(ctx);
}
};
diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
index d38369a008..03a0d2ed55 100644
--- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
@@ -58,8 +58,10 @@ public:
pathId, streamPathId, TRowVersion(tx->GetStep(), tx->GetTxId()));
}
- if (const auto heartbeatInterval = TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs())) {
- DataShard.GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, pathId, streamPathId, heartbeatInterval);
+ if (streamDesc.GetState() == NKikimrSchemeOp::ECdcStreamStateReady) {
+ if (const auto heartbeatInterval = TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs())) {
+ DataShard.GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, pathId, streamPathId, heartbeatInterval);
+ }
}
AddSender.Reset(new TEvChangeExchange::TEvAddSender(
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index a6f4b7e9ba..43c221da89 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2754,6 +2754,64 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(InitialScanAndResolvedTimestamps) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableChangefeedInitialScan(true)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ THolder<IEventHandle> delayed;
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvCdcStreamScanRequest) {
+ delayed.Reset(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ WithResolvedTimestamps(TDuration::Seconds(3),
+ WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))
+ )
+ ));
+
+ if (!delayed) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) {
+ return bool(delayed);
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ SimulateSleep(server, TDuration::Seconds(5));
+ runtime.SetObserverFunc(prevObserver);
+ runtime.Send(delayed.Release(), 0, true);
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ R"({"resolved":"***"})",
+ });
+ }
+
} // Cdc
} // NKikimr