diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-07-17 18:02:45 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-07-17 18:02:45 +0300 |
commit | 54c2702ef9a5f7fcff23ef380009baf2f13c9737 (patch) | |
tree | 9c8b2d47032e736f9f906f2a7dbf5bf868095afa | |
parent | 3faf3cf568022f471b9baadf091ea93089ba9be3 (diff) | |
download | ydb-54c2702ef9a5f7fcff23ef380009baf2f13c9737.tar.gz |
Do not emit heartbeats during initial scan KIKIMR-18159
-rw-r--r-- | ydb/core/tx/datashard/alter_cdc_stream_unit.cpp | 29 | ||||
-rw-r--r-- | ydb/core/tx/datashard/create_cdc_stream_unit.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 58 |
3 files changed, 80 insertions, 13 deletions
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp index cca1f497ea..bf3baa3e55 100644 --- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -41,18 +41,24 @@ public: TUserTable::TPtr tableInfo; switch (state) { case NKikimrSchemeOp::ECdcStreamStateDisabled: + tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state); + break; + case NKikimrSchemeOp::ECdcStreamStateReady: tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state); - if (state == NKikimrSchemeOp::ECdcStreamStateReady) { - if (params.HasDropSnapshot()) { - const auto& snapshot = params.GetDropSnapshot(); - Y_VERIFY(snapshot.GetStep() != 0); - - const TSnapshotKey key(pathId, snapshot.GetStep(), snapshot.GetTxId()); - DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key); - } else { - Y_VERIFY_DEBUG(false, "Absent snapshot"); - } + + if (params.HasDropSnapshot()) { + const auto& snapshot = params.GetDropSnapshot(); + Y_VERIFY(snapshot.GetStep() != 0); + + const TSnapshotKey key(pathId, snapshot.GetStep(), snapshot.GetTxId()); + DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key); + } else { + Y_VERIFY_DEBUG(false, "Absent snapshot"); + } + + if (const auto heartbeatInterval = TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs())) { + DataShard.GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, pathId, streamPathId, heartbeatInterval); } break; @@ -81,7 +87,8 @@ public: return EExecutionStatus::DelayCompleteNoMoreRestarts; } - void Complete(TOperation::TPtr, const TActorContext&) override { + void Complete(TOperation::TPtr, const TActorContext& ctx) override { + DataShard.EmitHeartbeats(ctx); } }; diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index d38369a008..03a0d2ed55 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -58,8 +58,10 @@ public: pathId, streamPathId, TRowVersion(tx->GetStep(), tx->GetTxId())); } - if (const auto heartbeatInterval = TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs())) { - DataShard.GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, pathId, streamPathId, heartbeatInterval); + if (streamDesc.GetState() == NKikimrSchemeOp::ECdcStreamStateReady) { + if (const auto heartbeatInterval = TDuration::MilliSeconds(streamDesc.GetResolvedTimestampsIntervalMs())) { + DataShard.GetCdcStreamHeartbeatManager().AddCdcStream(txc.DB, pathId, streamPathId, heartbeatInterval); + } } AddSender.Reset(new TEvChangeExchange::TEvAddSender( diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index a6f4b7e9ba..43c221da89 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2754,6 +2754,64 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(InitialScanAndResolvedTimestamps) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + THolder<IEventHandle> delayed; + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvCdcStreamScanRequest) { + delayed.Reset(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithResolvedTimestamps(TDuration::Seconds(3), + WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson)) + ) + )); + + if (!delayed) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) { + return bool(delayed); + }); + runtime.DispatchEvents(opts); + } + + SimulateSleep(server, TDuration::Seconds(5)); + runtime.SetObserverFunc(prevObserver); + runtime.Send(delayed.Release(), 0, true); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + R"({"resolved":"***"})", + }); + } + } // Cdc } // NKikimr |