aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-07-06 14:47:43 +0300
committerilnaz <ilnaz@ydb.tech>2023-07-06 14:47:43 +0300
commita7b48042dd4d955b69ba9cb6af03b7111849e64c (patch)
treeeb7476053a87f84f5663fa072a36f83f05c05a67
parentf3493e9e24c116f8dbd8508ba0742dad79ba1e6d (diff)
downloadydb-a7b48042dd4d955b69ba9cb6af03b7111849e64c.tar.gz
Fix racy progress & drop KIKIMR-18606
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.cpp23
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp64
2 files changed, 81 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp
index 16c28b6825..e49bb3fe32 100644
--- a/ydb/core/tx/datashard/cdc_stream_scan.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp
@@ -163,6 +163,7 @@ class TDataShard::TTxCdcStreamScanProgress
TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr Request;
THolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue> Response;
TVector<IDataShardChangeCollector::TChange> ChangeRecords;
+ bool Reschedule = false;
static TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table) {
TVector<TRawTypeValue> key(Reserve(cells.size()));
@@ -219,16 +220,26 @@ public:
LOG_D("Progress"
<< ": streamPathId# " << streamPathId);
- ChangeRecords.clear();
- if (Self->CheckChangesQueueOverflow()) {
+ if (!Self->GetUserTables().contains(tablePathId.LocalPathId)) {
+ LOG_W("Cannot progress on unknown table"
+ << ": tablePathId# " << tablePathId);
return true;
}
- Y_VERIFY(Self->GetUserTables().contains(tablePathId.LocalPathId));
auto table = Self->GetUserTables().at(tablePathId.LocalPathId);
auto it = table->CdcStreams.find(streamPathId);
- Y_VERIFY(it != table->CdcStreams.end());
+ if (it == table->CdcStreams.end()) {
+ LOG_W("Cannot progress on unknown cdc stream"
+ << ": streamPathId# " << streamPathId);
+ return true;
+ }
+
+ ChangeRecords.clear();
+ if (Self->CheckChangesQueueOverflow()) {
+ Reschedule = true;
+ return true;
+ }
NIceDb::TNiceDb db(txc.DB);
bool pageFault = false;
@@ -322,8 +333,8 @@ public:
Self->EnqueueChangeRecords(std::move(ChangeRecords));
ctx.Send(Request->Sender, Response.Release());
- } else {
- LOG_I("Re-run progress tx"
+ } else if (Reschedule) {
+ LOG_I("Re-schedule progress tx"
<< ": streamPathId# " << Request->Get()->StreamPathId);
// re-schedule tx
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index e79f7d7db5..ec8ba239ab 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2502,6 +2502,70 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(InitialScanRacyProgressAndDrop) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableChangefeedInitialScan(true)
+ .SetChangesQueueItemsLimit(1)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ bool delayProgress = true;
+ ui32 progressCount = 0;
+ TVector<THolder<IEventHandle>> delayed;
+
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ static constexpr ui32 EvCdcStreamScanProgress = EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 24;
+ if (ev->GetTypeRewrite() == EvCdcStreamScanProgress) {
+ ++progressCount;
+ if (delayProgress) {
+ delayed.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto waitProgress = [&](ui32 count) {
+ if (progressCount != count) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&progressCount, count](IEventHandle&) {
+ return progressCount == count;
+ });
+ runtime.DispatchEvents(opts);
+ }
+ };
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+
+ waitProgress(1);
+ WaitTxNotification(server, edgeActor, AsyncAlterDropStream(server, "/Root", "Table", "Stream"));
+
+ delayProgress = false;
+ for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ waitProgress(2);
+ }
+
Y_UNIT_TEST(AwsRegion) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())