aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-03-24 09:58:58 +0300
committersnaury <snaury@ydb.tech>2023-03-24 09:58:58 +0300
commit7d1555dfe2c8d5948a187d0e6f64d35f5f4762bb (patch)
treea42579e8e508e74484b9c5a28b509db2b569ccee
parentd10a044a8ef0e1ac2af81bcb491d957d26044c51 (diff)
downloadydb-7d1555dfe2c8d5948a187d0e6f64d35f5f4762bb.tar.gz
Test cancellation of enqueued snapshot reads
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp122
2 files changed, 122 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 692de517fd..3b06366a6b 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1797,9 +1797,8 @@ bool TPipeline::HasWaitingReadIterator(const TReadIteratorId& readId) {
bool TPipeline::CancelWaitingReadIterator(const TReadIteratorId& readId) {
auto it = WaitingReadIteratorsById.find(readId);
if (it != WaitingReadIteratorsById.end()) {
- if (!it->second->Cancelled) {
- it->second->Cancelled = true;
- }
+ it->second->Cancelled = true;
+ WaitingReadIteratorsById.erase(it);
return true;
}
@@ -1814,12 +1813,10 @@ void TPipeline::RegisterWaitingReadIterator(const TReadIteratorId& readId, TEvDa
bool TPipeline::HandleWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event) {
auto it = WaitingReadIteratorsById.find(readId);
if (it != WaitingReadIteratorsById.end() && it->second == event) {
- bool ok = !it->second->Cancelled;
WaitingReadIteratorsById.erase(it);
- return ok;
}
- return true;
+ return !event->Cancelled;
}
TRowVersion TPipeline::GetReadEdge() const {
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index e626a07cc0..a8c4728c60 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -551,12 +551,11 @@ struct TTestHelper {
return event;
}
- std::unique_ptr<TEvDataShard::TEvReadResult> SendRead(
+ void SendReadAsync(
const TString& tableName,
TEvDataShard::TEvRead* request,
ui32 node = 0,
- TActorId sender = {},
- TDuration timeout = TDuration::Max())
+ TActorId sender = {})
{
if (!sender) {
sender = Sender;
@@ -571,6 +570,16 @@ struct TTestHelper {
node,
GetTestPipeConfig(),
table.ClientId);
+ }
+
+ std::unique_ptr<TEvDataShard::TEvReadResult> SendRead(
+ const TString& tableName,
+ TEvDataShard::TEvRead* request,
+ ui32 node = 0,
+ TActorId sender = {},
+ TDuration timeout = TDuration::Max())
+ {
+ SendReadAsync(tableName, request, node, sender);
return WaitReadResult(timeout);
}
@@ -2871,6 +2880,113 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL);
}
+ Y_UNIT_TEST(ShouldCancelMvccSnapshotFromFuture) {
+ // checks that when snapshot is in the future, we can cancel it
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ TTestHelper helper(serverSettings);
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ helper.Server->GetRuntime()->DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ bool captureTimecast = false;
+ bool captureWaitNotify = false;
+
+ TRowVersion snapshot = TRowVersion::Min();
+ ui64 lastStep = 0;
+ ui64 waitPlanStep = 0;
+ ui64 notifyPlanStep = 0;
+ size_t readResults = 0;
+
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto {
+ switch (event->GetTypeRewrite()) {
+ case TEvMediatorTimecast::EvUpdate: {
+ if (captureTimecast) {
+ auto update = event->Get<TEvMediatorTimecast::TEvUpdate>();
+ lastStep = update->Record.GetTimeBarrier();
+ Cerr << "---- dropped EvUpdate ----" << Endl;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvMediatorTimecast::EvWaitPlanStep: {
+ if (captureWaitNotify) {
+ auto waitEvent = event->Get<TEvMediatorTimecast::TEvWaitPlanStep>();
+ waitPlanStep = waitEvent->PlanStep;
+ }
+ break;
+ }
+ case TEvMediatorTimecast::EvNotifyPlanStep: {
+ if (captureWaitNotify) {
+ auto notifyEvent = event->Get<TEvMediatorTimecast::TEvNotifyPlanStep>();
+ notifyPlanStep = notifyEvent->PlanStep;
+ }
+ break;
+ }
+ case TEvDataShard::EvReadResult: {
+ ++readResults;
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = helper.Server->GetRuntime()->SetObserverFunc(captureEvents);
+
+ // check transaction waits for proper plan step
+ captureTimecast = true;
+
+ // note that we need this to capture snapshot version
+ ExecSQL(helper.Server, helper.Sender, R"(
+ UPSERT INTO `/Root/table-1`
+ (key1, key2, key3, value)
+ VALUES
+ (3, 3, 3, 300);
+ )");
+
+ waitFor([&]{ return lastStep != 0; }, "intercepted TEvUpdate");
+
+ captureTimecast = false;
+ captureWaitNotify = true;
+
+ // future snapshot
+ snapshot = TRowVersion(lastStep + 1000, Max<ui64>());
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW, snapshot);
+ AddKeyQuery(*request1, {3, 3, 3});
+ AddKeyQuery(*request1, {1, 1, 1});
+ AddKeyQuery(*request1, {5, 5, 5});
+ request1->Record.SetMaxRowsInResult(1);
+
+ helper.SendReadAsync("table-1", request1.release());
+
+ waitFor([&]{ return waitPlanStep != 0; }, "intercepted TEvWaitPlanStep");
+ UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step);
+ UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, 0);
+
+ helper.SendCancel("table-1", 1);
+
+ waitFor([&]{ return notifyPlanStep != 0; }, "intercepted TEvNotifyPlanStep");
+ UNIT_ASSERT_VALUES_EQUAL(waitPlanStep, snapshot.Step);
+ UNIT_ASSERT_VALUES_EQUAL(notifyPlanStep, snapshot.Step);
+
+ SimulateSleep(helper.Server, TDuration::Seconds(2));
+
+ UNIT_ASSERT_VALUES_EQUAL(readResults, 0);
+ }
+
Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKey) {
TTestHelper helper;