diff options
author | ilnaz <[email protected]> | 2023-06-28 22:41:00 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-06-28 22:41:00 +0300 |
commit | 13155eed3e752c8239f113b67767a2cc74ea0f17 (patch) | |
tree | 951edab2466fa50e7fbce7b861e3e1101b203a7a | |
parent | fef9e85477600ab08d07ff1fbfa09f8096a13074 (diff) |
Do not lose sender when rescheduling
-rw-r--r-- | ydb/core/tx/datashard/cdc_stream_scan.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 73 |
3 files changed, 75 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 44761764bab..ffd8ad5e5f8 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -320,7 +320,7 @@ public: << ": streamPathId# " << Request->Get()->StreamPathId); // re-schedule tx - ctx.Schedule(TDuration::Seconds(1), Request->Release().Release()); + ctx.TActivationContext::Schedule(TDuration::Seconds(1), Request->Forward(ctx.SelfID)); } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 79d691d35ae..2fdf4b38e43 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -336,7 +336,7 @@ class TDataShard EvMediatorRestoreBackup, EvRemoveLockChangeRecords, EvCdcStreamScanRegistered, - EvCdcStreamScanProgress, + EvCdcStreamScanProgress, // WARNING: tests use ES_PRIVATE + 24 EvCdcStreamScanContinue, EvRestartOperation, // used to restart after an aborted scan (e.g. backup) EvChangeExchangeExecuteHandshakes, diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index dc5249d84c3..5e67c20e02c 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2366,6 +2366,79 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(InitialScanAndLimits) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetChangesQueueItemsLimit(1) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + TVector<THolder<IEventHandle>> delayed; + ui32 progressCount = 0; + + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + static constexpr ui32 EvCdcStreamScanProgress = EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 24; + + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCdcStreamScanRequest: + if (auto* msg = ev->Get<TEvDataShard::TEvCdcStreamScanRequest>()) { + msg->Record.MutableLimits()->SetBatchMaxRows(1); + } else { + UNIT_ASSERT(false); + } + break; + + case TEvChangeExchange::EvEnqueueRecords: + delayed.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + + case EvCdcStreamScanProgress: + ++progressCount; + break; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + if (delayed.empty()) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed, &progressCount](IEventHandle&) { + return !delayed.empty() && progressCount >= 2; + }); + runtime.DispatchEvents(opts); + } + + runtime.SetObserverFunc(prevObserver); + for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) { + runtime.Send(ev.Release(), 0, true); + } + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + }); + } + Y_UNIT_TEST(AwsRegion) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) |