aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-07-06 13:34:14 +0300
committerilnaz <ilnaz@ydb.tech>2023-07-06 13:34:14 +0300
commit6224485996e3928236bca24e1e526a91d492f569 (patch)
tree1aa99f819dfe3b9f113e196d86055590b0a3323b
parent512da60c5d031e9d3ac0b96c7ecb8b0891267ed0 (diff)
downloadydb-6224485996e3928236bca24e1e526a91d492f569.tar.gz
Do not fail after scan is completed KIKIMR-18613
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp63
2 files changed, 64 insertions, 1 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
index e2e7af0e5e..f60172d371 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
@@ -209,7 +209,7 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
}
} else {
- Y_FAIL_S("Cannot retrieve cdc stream scan info: " << pathId);
+ // nop, scan is completed
}
break;
default:
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 5e67c20e02..e79f7d7db5 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2439,6 +2439,69 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(InitialScanComplete) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableChangefeedInitialScan(true)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20);
+ )");
+
+ THolder<IEventHandle> delayed;
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == NSchemeShard::TEvSchemeShard::EvModifySchemeTransaction) {
+ auto* msg = ev->Get<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransaction>();
+ const auto& tx = msg->Record.GetTransaction(0);
+ if (tx.HasAlterCdcStream() && tx.GetAlterCdcStream().HasGetReady()) {
+ delayed.Reset(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+
+ if (!delayed) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) {
+ return bool(delayed);
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (3, 30),
+ (4, 40);
+ )");
+
+ runtime.SetObserverFunc(prevObserver);
+ runtime.Send(delayed.Release(), 0, true);
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ R"({"update":{"value":40},"key":[4]})",
+ });
+ }
+
Y_UNIT_TEST(AwsRegion) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())