diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-07-06 14:47:43 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-07-06 14:47:43 +0300 |
commit | a7b48042dd4d955b69ba9cb6af03b7111849e64c (patch) | |
tree | eb7476053a87f84f5663fa072a36f83f05c05a67 | |
parent | f3493e9e24c116f8dbd8508ba0742dad79ba1e6d (diff) | |
download | ydb-a7b48042dd4d955b69ba9cb6af03b7111849e64c.tar.gz |
Fix racy progress & drop KIKIMR-18606
-rw-r--r-- | ydb/core/tx/datashard/cdc_stream_scan.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 64 |
2 files changed, 81 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 16c28b6825..e49bb3fe32 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -163,6 +163,7 @@ class TDataShard::TTxCdcStreamScanProgress TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr Request; THolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue> Response; TVector<IDataShardChangeCollector::TChange> ChangeRecords; + bool Reschedule = false; static TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table) { TVector<TRawTypeValue> key(Reserve(cells.size())); @@ -219,16 +220,26 @@ public: LOG_D("Progress" << ": streamPathId# " << streamPathId); - ChangeRecords.clear(); - if (Self->CheckChangesQueueOverflow()) { + if (!Self->GetUserTables().contains(tablePathId.LocalPathId)) { + LOG_W("Cannot progress on unknown table" + << ": tablePathId# " << tablePathId); return true; } - Y_VERIFY(Self->GetUserTables().contains(tablePathId.LocalPathId)); auto table = Self->GetUserTables().at(tablePathId.LocalPathId); auto it = table->CdcStreams.find(streamPathId); - Y_VERIFY(it != table->CdcStreams.end()); + if (it == table->CdcStreams.end()) { + LOG_W("Cannot progress on unknown cdc stream" + << ": streamPathId# " << streamPathId); + return true; + } + + ChangeRecords.clear(); + if (Self->CheckChangesQueueOverflow()) { + Reschedule = true; + return true; + } NIceDb::TNiceDb db(txc.DB); bool pageFault = false; @@ -322,8 +333,8 @@ public: Self->EnqueueChangeRecords(std::move(ChangeRecords)); ctx.Send(Request->Sender, Response.Release()); - } else { - LOG_I("Re-run progress tx" + } else if (Reschedule) { + LOG_I("Re-schedule progress tx" << ": streamPathId# " << Request->Get()->StreamPathId); // re-schedule tx diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index e79f7d7db5..ec8ba239ab 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2502,6 +2502,70 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(InitialScanRacyProgressAndDrop) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetChangesQueueItemsLimit(1) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + bool delayProgress = true; + ui32 progressCount = 0; + TVector<THolder<IEventHandle>> delayed; + + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + static constexpr ui32 EvCdcStreamScanProgress = EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 24; + if (ev->GetTypeRewrite() == EvCdcStreamScanProgress) { + ++progressCount; + if (delayProgress) { + delayed.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto waitProgress = [&](ui32 count) { + if (progressCount != count) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&progressCount, count](IEventHandle&) { + return progressCount == count; + }); + runtime.DispatchEvents(opts); + } + }; + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + waitProgress(1); + WaitTxNotification(server, edgeActor, AsyncAlterDropStream(server, "/Root", "Table", "Stream")); + + delayProgress = false; + for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) { + runtime.Send(ev.Release(), 0, true); + } + + waitProgress(2); + } + Y_UNIT_TEST(AwsRegion) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) |