diff options
author | snaury <snaury@ydb.tech> | 2023-03-24 09:58:58 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-03-24 09:58:58 +0300 |
commit | 7d1555dfe2c8d5948a187d0e6f64d35f5f4762bb (patch) | |
tree | a42579e8e508e74484b9c5a28b509db2b569ccee | |
parent | d10a044a8ef0e1ac2af81bcb491d957d26044c51 (diff) | |
download | ydb-7d1555dfe2c8d5948a187d0e6f64d35f5f4762bb.tar.gz |
Test cancellation of enqueued snapshot reads
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 122 |
2 files changed, 122 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 692de517fd..3b06366a6b 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1797,9 +1797,8 @@ bool TPipeline::HasWaitingReadIterator(const TReadIteratorId& readId) { bool TPipeline::CancelWaitingReadIterator(const TReadIteratorId& readId) { auto it = WaitingReadIteratorsById.find(readId); if (it != WaitingReadIteratorsById.end()) { - if (!it->second->Cancelled) { - it->second->Cancelled = true; - } + it->second->Cancelled = true; + WaitingReadIteratorsById.erase(it); return true; } @@ -1814,12 +1813,10 @@ void TPipeline::RegisterWaitingReadIterator(const TReadIteratorId& readId, TEvDa bool TPipeline::HandleWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event) { auto it = WaitingReadIteratorsById.find(readId); if (it != WaitingReadIteratorsById.end() && it->second == event) { - bool ok = !it->second->Cancelled; WaitingReadIteratorsById.erase(it); - return ok; } - return true; + return !event->Cancelled; } TRowVersion TPipeline::GetReadEdge() const { diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index e626a07cc0..a8c4728c60 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -551,12 +551,11 @@ struct TTestHelper { return event; } - std::unique_ptr<TEvDataShard::TEvReadResult> SendRead( + void SendReadAsync( const TString& tableName, TEvDataShard::TEvRead* request, ui32 node = 0, - TActorId sender = {}, - TDuration timeout = TDuration::Max()) + TActorId sender = {}) { if (!sender) { sender = Sender; @@ -571,6 +570,16 @@ struct TTestHelper { node, GetTestPipeConfig(), table.ClientId); + } + + std::unique_ptr<TEvDataShard::TEvReadResult> SendRead( + const TString& tableName, + TEvDataShard::TEvRead* request, + ui32 node = 0, + TActorId sender = {}, + TDuration timeout = TDuration::Max()) + { + SendReadAsync(tableName, request, node, sender); return WaitReadResult(timeout); } @@ -2871,6 +2880,113 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL); } + Y_UNIT_TEST(ShouldCancelMvccSnapshotFromFuture) { + // checks that when snapshot is in the future, we can cancel it + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + TTestHelper helper(serverSettings); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + helper.Server->GetRuntime()->DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + bool captureTimecast = false; + bool captureWaitNotify = false; + + TRowVersion snapshot = TRowVersion::Min(); + ui64 lastStep = 0; + ui64 waitPlanStep = 0; + ui64 notifyPlanStep = 0; + size_t readResults = 0; + + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto { + switch (event->GetTypeRewrite()) { + case TEvMediatorTimecast::EvUpdate: { + if (captureTimecast) { + auto update = event->Get<TEvMediatorTimecast::TEvUpdate>(); + lastStep = update->Record.GetTimeBarrier(); + Cerr << "---- dropped EvUpdate ----" << Endl; + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvMediatorTimecast::EvWaitPlanStep: { + if (captureWaitNotify) { + auto waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>(); + waitPlanStep = waitEvent->PlanStep; + } + break; + } + case TEvMediatorTimecast::EvNotifyPlanStep: { + if (captureWaitNotify) { + auto notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>(); + notifyPlanStep = notifyEvent->PlanStep; + } + break; + } + case TEvDataShard::EvReadResult: { + ++readResults; + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = helper.Server->GetRuntime()->SetObserverFunc(captureEvents); + + // check transaction waits for proper plan step + captureTimecast = true; + + // note that we need this to capture snapshot version + ExecSQL(helper.Server, helper.Sender, R"( + UPSERT INTO `/Root/table-1` + (key1, key2, key3, value) + VALUES + (3, 3, 3, 300); + )"); + + waitFor([&]{ return lastStep != 0; }, "intercepted TEvUpdate"); + + captureTimecast = false; + captureWaitNotify = true; + + // future snapshot + snapshot = TRowVersion(lastStep + 1000, Max<ui64>()); + + auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, snapshot); + AddKeyQuery(*request1, {3, 3, 3}); + AddKeyQuery(*request1, {1, 1, 1}); + AddKeyQuery(*request1, {5, 5, 5}); + request1->Record.SetMaxRowsInResult(1); + + helper.SendReadAsync("table-1", request1.release()); + + waitFor([&]{ return waitPlanStep != 0; }, "intercepted TEvWaitPlanStep"); + UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step); + UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, 0); + + helper.SendCancel("table-1", 1); + + waitFor([&]{ return notifyPlanStep != 0; }, "intercepted TEvNotifyPlanStep"); + UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step); + UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, snapshot.Step); + + SimulateSleep(helper.Server, TDuration::Seconds(2)); + + UNIT_ASSERT_VALUES_EQUAL(readResults, 0); + } + Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKey) { TTestHelper helper; |