diff options
author | snaury <[email protected]> | 2023-08-10 10:06:08 +0300 |
---|---|---|
committer | snaury <[email protected]> | 2023-08-10 11:02:36 +0300 |
commit | 4d050ae447ea856fc2ba184d511f70f8fa3a3521 (patch) | |
tree | c9660ae048d4e583ce823260aa3a06033532e5b8 | |
parent | de7011fdcbc2c97cac915ac397a21057e64267b0 (diff) |
Fix unsafe QueueScan during or immediately after a split/merge KIKIMR-18986
-rw-r--r-- | ydb/core/tx/datashard/build_index.cpp | 31 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__kqp_scan.cpp | 30 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp | 147 |
4 files changed, 211 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp index 7ec1c9c1ab6..79554928eb1 100644 --- a/ydb/core/tx/datashard/build_index.cpp +++ b/ydb/core/tx/datashard/build_index.cpp @@ -554,8 +554,31 @@ TAutoPtr<NTable::IScan> CreateBuildIndexScan( buildIndexId, target, seqNo, dataShardId, datashardActorId, schemeshardActorId, range, targetIndexColumns, targetDataColumns, tableInfo, limits); } +class TDataShard::TTxHandleSafeBuildIndexScan : public NTabletFlatExecutor::TTransactionBase<TDataShard> { +public: + TTxHandleSafeBuildIndexScan(TDataShard* self, TEvDataShard::TEvBuildIndexCreateRequest::TPtr&& ev) + : TTransactionBase(self) + , Ev(std::move(ev)) + {} + + bool Execute(TTransactionContext&, const TActorContext& ctx) { + Self->HandleSafe(Ev, ctx); + return true; + } + + void Complete(const TActorContext&) { + // nothing + } + +private: + TEvDataShard::TEvBuildIndexCreateRequest::TPtr Ev; +}; + +void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext&) { + Execute(new TTxHandleSafeBuildIndexScan(this, std::move(ev))); +} -void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx) { +void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; // Note: it's very unlikely that we have volatile txs before this snapshot @@ -648,6 +671,12 @@ void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, cons return; } + if (!IsStateActive()) { + badRequest(TStringBuilder() << "Shard " << TabletID() << " is not ready for requests"); + ctx.Send(ev->Sender, std::move(response)); + return; + } + TScanOptions scanOpts; scanOpts.SetSnapshotRowVersion(TRowVersion(snapshotKey.Step, snapshotKey.TxId)); scanOpts.SetResourceBroker("build_index", 10); diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp index 681c75ccf02..eeb95476c59 100644 --- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp @@ -535,7 +535,31 @@ private: TOwnedCellVec LastKey; }; +class TDataShard::TTxHandleSafeKqpScan : public NTabletFlatExecutor::TTransactionBase<TDataShard> { +public: + TTxHandleSafeKqpScan(TDataShard* self, TEvDataShard::TEvKqpScan::TPtr&& ev) + : TTransactionBase(self) + , Ev(std::move(ev)) + {} + + bool Execute(TTransactionContext&, const TActorContext& ctx) { + Self->HandleSafe(Ev, ctx); + return true; + } + + void Complete(const TActorContext&) { + // nothing + } + +private: + TEvDataShard::TEvKqpScan::TPtr Ev; +}; + void TDataShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext&) { + Execute(new TTxHandleSafeKqpScan(this, std::move(ev))); +} + +void TDataShard::HandleSafe(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext&) { auto& request = ev->Get()->Record; auto scanComputeActor = ev->Sender; auto generation = request.GetGeneration(); @@ -619,6 +643,12 @@ void TDataShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& return; } + if (!IsStateActive()) { + reportError(request.GetTablePath(), TStringBuilder() << "TxId: " << request.GetTxId() << "." + << " Shard " << TabletID() << " is not ready to process requests."); + return; + } + Pipeline.StartStreamingTx(snapshot.GetTxId(), 1); TSmallVec<TSerializedTableRange> ranges; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 617ec94b7ff..e20a188bddc 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -235,6 +235,9 @@ class TDataShard class TTxReadViaPipeline; class TReadOperation; + class TTxHandleSafeKqpScan; + class TTxHandleSafeBuildIndexScan; + ITransaction *CreateTxMonitoring(TDataShard *self, NMon::TEvRemoteHttpInfo::TPtr ev); ITransaction *CreateTxGetInfo(TDataShard *self, @@ -1210,6 +1213,7 @@ class TDataShard void Handle(TEvDataShard::TEvGetTableStats::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx); + void HandleSafe(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvConditionalEraseRowsRequest::TPtr& ev, const TActorContext& ctx); @@ -1238,6 +1242,7 @@ class TDataShard void Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx); + void HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index b5d2a176ec0..271c35a3476 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -1,9 +1,11 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> +#include <ydb/core/kqp/rm_service/kqp_snapshot_manager.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/datashard/datashard_ut_common_kqp.h> #include <ydb/core/tx/datashard/datashard_ut_common.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/base/blobstorage.h> namespace NKikimr { namespace NKqp { @@ -636,8 +638,151 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT_VALUES_EQUAL(incomingRangesSize, 3); } + Y_UNIT_TEST(ScanAfterSplitSlowMetaRead) { + NKikimrConfig::TAppConfig appCfg; + + auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); + rm->SetChannelBufferSize(100); + rm->SetMinChannelBufferSize(100); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetAppConfig(appCfg) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + EnableLogging(runtime); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", 1); + ExecSQL(server, sender, FillTableQuery()); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + std::optional<int> result; + std::optional<Ydb::StatusIds::StatusCode> status; + auto streamSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { + Cerr << "... response " << ev->GetTypeRewrite() << " " << ev->GetTypeName() << " " << ev->ToString() << Endl; + switch (ev->GetTypeRewrite()) { + case NKqp::TEvKqpExecuter::TEvStreamData::EventType: { + auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>(); + auto& record = msg->Record; + Y_ASSERT(record.GetResultSet().rows().size() == 1); + Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); + result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(record.GetSeqNo()); + resp->Record.SetFreeSpace(100); + ctx.Send(ev->Sender, resp.Release()); + break; + } + case NKqp::TEvKqp::TEvQueryResponse::EventType: { + auto* msg = ev->Get<NKqp::TEvKqp::TEvQueryResponse>(); + auto& record = msg->Record.GetRef(); + status = record.GetYdbStatus(); + break; + } + } + })); + + SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, "SELECT sum(value) FROM `/Root/table-1`;", false)); + waitFor([&]{ return bool(status); }, "request status"); + + UNIT_ASSERT_VALUES_EQUAL(*status, Ydb::StatusIds::SUCCESS); + + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 596400); + + SetSplitMergePartCountLimit(&runtime, -1); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + TVector<THolder<IEventHandle>> blockedGets; + TVector<THolder<IEventHandle>> blockedSnapshots; + auto blockGetObserver = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case NKqp::TEvKqpSnapshot::TEvCreateSnapshotResponse::EventType: { + Cerr << "... blocking snapshot response" << Endl; + blockedSnapshots.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + case TEvBlobStorage::TEvGet::EventType: { + auto* msg = ev->Get<TEvBlobStorage::TEvGet>(); + bool block = false; + for (ui32 i = 0; i < msg->QuerySize; ++i) { + if (msg->Queries[i].Id.TabletID() == shards.at(0)) { + Cerr << "... blocking get request to " << msg->Queries[i].Id << Endl; + block = true; + } + } + if (block) { + blockedGets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(blockGetObserver); + + result = {}; + status = {}; + SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, "SELECT sum(value) FROM `/Root/table-1`;", false)); + + waitFor([&]{ return blockedSnapshots.size() > 0; }, "snapshot response"); + + // Start a split, surprisingly it will succeed despite blocked events + auto senderSplit = runtime.AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/table-1", shards.at(0), 55 /* splitKey */); + WaitTxNotification(server, senderSplit, txId); + + // Unblock snapshot and try waiting for results + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : blockedSnapshots) { + ui32 nodeIdx = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0); + Cerr << "... unblocking snapshot" << Endl; + runtime.Send(ev.Release(), nodeIdx, true); + } + blockedSnapshots.clear(); + + SimulateSleep(runtime, TDuration::Seconds(1)); + UNIT_ASSERT_C(!status, "Query finished with status: " << *status); + UNIT_ASSERT_C(!result, "Query returned unexpected result: " << *result); + + // Unblock storage reads and wait for result + for (auto& ev : blockedGets) { + ui32 nodeIdx = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0); + Cerr << "... unblocking get" << Endl; + runtime.Send(ev.Release(), nodeIdx, true); + } + blockedGets.clear(); + + waitFor([&]{ return bool(status); }, "request finish"); + UNIT_ASSERT_VALUES_EQUAL(*status, Ydb::StatusIds::SUCCESS); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 596400); + } + } } // namespace NKqp } // namespace NKikimr - |