diff options
author | snaury <snaury@ydb.tech> | 2023-05-15 15:05:51 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-05-15 15:05:51 +0300 |
commit | dad0c8aa91c4dc2a764dc9585b7efbf1b789b98e (patch) | |
tree | 8f27284a483a2597a4d4d5c3b47ccdde645a2a4e | |
parent | 03441e363777436aa0f01f46c579cf9084d1249f (diff) | |
download | ydb-dad0c8aa91c4dc2a764dc9585b7efbf1b789b98e.tar.gz |
Finish executing read operation when cancelled during a page fault
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 98 |
2 files changed, 106 insertions, 5 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index b2bf66b1269..f917121b6ca 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1700,13 +1700,11 @@ public: << ": at tablet# " << Self->TabletID()); auto it = Self->ReadIterators.find(ReadId); - if (it == Self->ReadIterators.end()) { - // iterator aborted + if (it == Self->ReadIterators.end() && !Op) { + // iterator aborted before we could start operation return true; } - auto& state = *it->second; - try { // If tablet is in follower mode then we should sync scheme // before we build and check operation. @@ -1719,13 +1717,18 @@ public: } if (status != NKikimrTxDataShard::TError::OK) { + Y_VERIFY_DEBUG(!Op); + if (Y_UNLIKELY(it == Self->ReadIterators.end())) { + // iterator already aborted + return true; + } std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); SetStatusError( result->Record, Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Failed to sync follower: " << errMessage); result->Record.SetReadId(ReadId.ReadId); - SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release()); + SendViaSession(it->second->SessionId, ReadId.Sender, Self->SelfId(), result.release()); return true; } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 7f9a176a203..6ea3cac5d1c 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -6,6 +6,7 @@ #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/formats/arrow/converter.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/tablet_flat/shared_cache_events.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/tx_proxy/read_table.h> @@ -3575,4 +3576,101 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { } }; +Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) { + Y_UNIT_TEST(CancelPageFaultedReadThenDropTable) { + TPortManager pm; + NFake::TCaches caches; + caches.Shared = 1 /* bytes */; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetCacheParams(caches); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO); + // runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + auto opts = TShardedTableOptions() + .Shards(1) + .ExecutorCacheSize(1 /* byte */) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6)")); + SimulateSleep(runtime, TDuration::Seconds(1)); + + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + CompactTable(runtime, shard1, tableId1, false); + RebootTablet(runtime, shard1, sender); + SimulateSleep(runtime, TDuration::Seconds(1)); + + size_t observedReadResults = 0; + bool captureCacheRequests = true; + std::vector<std::unique_ptr<IEventHandle>> capturedCacheRequests; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvReadResult::EventType: { + auto* msg = ev->Get<TEvDataShard::TEvReadResult>(); + Cerr << "... observed TEvReadResult:\n" << msg->ToString() << Endl; + observedReadResults++; + break; + } + case NSharedCache::TEvRequest::EventType: { + if (captureCacheRequests) { + Cerr << "... captured TEvRequest" << Endl; + capturedCacheRequests.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + auto readSender = runtime.AllocateEdgeActor(); + auto tabletPipe = runtime.ConnectToPipe(shard1, readSender, 0, NTabletPipe::TClientConfig()); + { + auto request = std::make_unique<TEvDataShard::TEvRead>(); + request->Record.SetReadId(1); + request->Record.MutableTableId()->SetOwnerId(tableId1.PathId.OwnerId); + request->Record.MutableTableId()->SetTableId(tableId1.PathId.LocalPathId); + request->Record.MutableTableId()->SetSchemaVersion(tableId1.SchemaVersion); + request->Record.AddColumns(1); + request->Record.AddColumns(2); + request->Ranges.emplace_back(TOwnedCellVec(), true, TOwnedCellVec(), true); + runtime.SendToPipe(tabletPipe, readSender, request.release()); + } + + WaitFor(runtime, [&]() { return capturedCacheRequests.size() > 0 || observedReadResults > 0; }, "shared cache request"); + UNIT_ASSERT_C(capturedCacheRequests.size() > 0, "cache request was not captured"); + + { + auto request = std::make_unique<TEvDataShard::TEvReadCancel>(); + request->Record.SetReadId(1); + runtime.SendToPipe(tabletPipe, readSender, request.release()); + } + SimulateSleep(runtime, TDuration::Seconds(1)); + + captureCacheRequests = false; + for (auto& ev : capturedCacheRequests) { + runtime.Send(ev.release(), 0, true); + } + + // We should be able to drop table + WaitTxNotification(server, AsyncDropTable(server, sender, "/Root", "table-1")); + } +} + } // namespace NKikimr |