summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-06-28 22:41:00 +0300
committerilnaz <[email protected]>2023-06-28 22:41:00 +0300
commit13155eed3e752c8239f113b67767a2cc74ea0f17 (patch)
tree951edab2466fa50e7fbce7b861e3e1101b203a7a
parentfef9e85477600ab08d07ff1fbfa09f8096a13074 (diff)
Do not lose sender when rescheduling
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp73
3 files changed, 75 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp
index 44761764bab..ffd8ad5e5f8 100644
--- a/ydb/core/tx/datashard/cdc_stream_scan.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp
@@ -320,7 +320,7 @@ public:
<< ": streamPathId# " << Request->Get()->StreamPathId);
// re-schedule tx
- ctx.Schedule(TDuration::Seconds(1), Request->Release().Release());
+ ctx.TActivationContext::Schedule(TDuration::Seconds(1), Request->Forward(ctx.SelfID));
}
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 79d691d35ae..2fdf4b38e43 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -336,7 +336,7 @@ class TDataShard
EvMediatorRestoreBackup,
EvRemoveLockChangeRecords,
EvCdcStreamScanRegistered,
- EvCdcStreamScanProgress,
+ EvCdcStreamScanProgress, // WARNING: tests use ES_PRIVATE + 24
EvCdcStreamScanContinue,
EvRestartOperation, // used to restart after an aborted scan (e.g. backup)
EvChangeExchangeExecuteHandshakes,
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index dc5249d84c3..5e67c20e02c 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2366,6 +2366,79 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(InitialScanAndLimits) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableChangefeedInitialScan(true)
+ .SetChangesQueueItemsLimit(1)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ TVector<THolder<IEventHandle>> delayed;
+ ui32 progressCount = 0;
+
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ static constexpr ui32 EvCdcStreamScanProgress = EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 24;
+
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCdcStreamScanRequest:
+ if (auto* msg = ev->Get<TEvDataShard::TEvCdcStreamScanRequest>()) {
+ msg->Record.MutableLimits()->SetBatchMaxRows(1);
+ } else {
+ UNIT_ASSERT(false);
+ }
+ break;
+
+ case TEvChangeExchange::EvEnqueueRecords:
+ delayed.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+
+ case EvCdcStreamScanProgress:
+ ++progressCount;
+ break;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+
+ if (delayed.empty()) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed, &progressCount](IEventHandle&) {
+ return !delayed.empty() && progressCount >= 2;
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ runtime.SetObserverFunc(prevObserver);
+ for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ });
+ }
+
Y_UNIT_TEST(AwsRegion) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())