diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-07-06 13:34:14 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-07-06 13:34:14 +0300 |
commit | 6224485996e3928236bca24e1e526a91d492f569 (patch) | |
tree | 1aa99f819dfe3b9f113e196d86055590b0a3323b | |
parent | 512da60c5d031e9d3ac0b96c7ecb8b0891267ed0 (diff) | |
download | ydb-6224485996e3928236bca24e1e526a91d492f569.tar.gz |
Do not fail after scan is completed KIKIMR-18613
-rw-r--r-- | ydb/core/tx/datashard/change_collector_cdc_stream.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 63 |
2 files changed, 64 insertions, 1 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index e2e7af0e5e..f60172d371 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -209,7 +209,7 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } } } else { - Y_FAIL_S("Cannot retrieve cdc stream scan info: " << pathId); + // nop, scan is completed } break; default: diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 5e67c20e02..e79f7d7db5 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2439,6 +2439,69 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(InitialScanComplete) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20); + )"); + + THolder<IEventHandle> delayed; + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == NSchemeShard::TEvSchemeShard::EvModifySchemeTransaction) { + auto* msg = ev->Get<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransaction>(); + const auto& tx = msg->Record.GetTransaction(0); + if (tx.HasAlterCdcStream() && tx.GetAlterCdcStream().HasGetReady()) { + delayed.Reset(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + if (!delayed) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) { + return bool(delayed); + }); + runtime.DispatchEvents(opts); + } + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (3, 30), + (4, 40); + )"); + + runtime.SetObserverFunc(prevObserver); + runtime.Send(delayed.Release(), 0, true); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + R"({"update":{"value":40},"key":[4]})", + }); + } + Y_UNIT_TEST(AwsRegion) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) |