aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-05-15 15:05:51 +0300
committersnaury <snaury@ydb.tech>2023-05-15 15:05:51 +0300
commitdad0c8aa91c4dc2a764dc9585b7efbf1b789b98e (patch)
tree8f27284a483a2597a4d4d5c3b47ccdde645a2a4e
parent03441e363777436aa0f01f46c579cf9084d1249f (diff)
downloadydb-dad0c8aa91c4dc2a764dc9585b7efbf1b789b98e.tar.gz
Finish executing read operation when cancelled during a page fault
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp13
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp98
2 files changed, 106 insertions, 5 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index b2bf66b1269..f917121b6ca 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -1700,13 +1700,11 @@ public:
<< ": at tablet# " << Self->TabletID());
auto it = Self->ReadIterators.find(ReadId);
- if (it == Self->ReadIterators.end()) {
- // iterator aborted
+ if (it == Self->ReadIterators.end() && !Op) {
+ // iterator aborted before we could start operation
return true;
}
- auto& state = *it->second;
-
try {
// If tablet is in follower mode then we should sync scheme
// before we build and check operation.
@@ -1719,13 +1717,18 @@ public:
}
if (status != NKikimrTxDataShard::TError::OK) {
+ Y_VERIFY_DEBUG(!Op);
+ if (Y_UNLIKELY(it == Self->ReadIterators.end())) {
+ // iterator already aborted
+ return true;
+ }
std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
SetStatusError(
result->Record,
Ydb::StatusIds::INTERNAL_ERROR,
TStringBuilder() << "Failed to sync follower: " << errMessage);
result->Record.SetReadId(ReadId.ReadId);
- SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), result.release());
+ SendViaSession(it->second->SessionId, ReadId.Sender, Self->SelfId(), result.release());
return true;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 7f9a176a203..6ea3cac5d1c 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/converter.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/tablet_flat/shared_cache_events.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/tx_proxy/read_table.h>
@@ -3575,4 +3576,101 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
}
};
+Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
+ Y_UNIT_TEST(CancelPageFaultedReadThenDropTable) {
+ TPortManager pm;
+ NFake::TCaches caches;
+ caches.Shared = 1 /* bytes */;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetCacheParams(caches);
+ TServer::TPtr server = new TServer(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);
+ // runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .ExecutorCacheSize(1 /* byte */)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6)"));
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+ CompactTable(runtime, shard1, tableId1, false);
+ RebootTablet(runtime, shard1, sender);
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ size_t observedReadResults = 0;
+ bool captureCacheRequests = true;
+ std::vector<std::unique_ptr<IEventHandle>> capturedCacheRequests;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvReadResult::EventType: {
+ auto* msg = ev->Get<TEvDataShard::TEvReadResult>();
+ Cerr << "... observed TEvReadResult:\n" << msg->ToString() << Endl;
+ observedReadResults++;
+ break;
+ }
+ case NSharedCache::TEvRequest::EventType: {
+ if (captureCacheRequests) {
+ Cerr << "... captured TEvRequest" << Endl;
+ capturedCacheRequests.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ auto readSender = runtime.AllocateEdgeActor();
+ auto tabletPipe = runtime.ConnectToPipe(shard1, readSender, 0, NTabletPipe::TClientConfig());
+ {
+ auto request = std::make_unique<TEvDataShard::TEvRead>();
+ request->Record.SetReadId(1);
+ request->Record.MutableTableId()->SetOwnerId(tableId1.PathId.OwnerId);
+ request->Record.MutableTableId()->SetTableId(tableId1.PathId.LocalPathId);
+ request->Record.MutableTableId()->SetSchemaVersion(tableId1.SchemaVersion);
+ request->Record.AddColumns(1);
+ request->Record.AddColumns(2);
+ request->Ranges.emplace_back(TOwnedCellVec(), true, TOwnedCellVec(), true);
+ runtime.SendToPipe(tabletPipe, readSender, request.release());
+ }
+
+ WaitFor(runtime, [&]() { return capturedCacheRequests.size() > 0 || observedReadResults > 0; }, "shared cache request");
+ UNIT_ASSERT_C(capturedCacheRequests.size() > 0, "cache request was not captured");
+
+ {
+ auto request = std::make_unique<TEvDataShard::TEvReadCancel>();
+ request->Record.SetReadId(1);
+ runtime.SendToPipe(tabletPipe, readSender, request.release());
+ }
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ captureCacheRequests = false;
+ for (auto& ev : capturedCacheRequests) {
+ runtime.Send(ev.release(), 0, true);
+ }
+
+ // We should be able to drop table
+ WaitTxNotification(server, AsyncDropTable(server, sender, "/Root", "table-1"));
+ }
+}
+
} // namespace NKikimr