summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <[email protected]>2023-08-10 10:06:08 +0300
committersnaury <[email protected]>2023-08-10 11:02:36 +0300
commit4d050ae447ea856fc2ba184d511f70f8fa3a3521 (patch)
treec9660ae048d4e583ce823260aa3a06033532e5b8
parentde7011fdcbc2c97cac915ac397a21057e64267b0 (diff)
Fix unsafe QueueScan during or immediately after a split/merge KIKIMR-18986
-rw-r--r--ydb/core/tx/datashard/build_index.cpp31
-rw-r--r--ydb/core/tx/datashard/datashard__kqp_scan.cpp30
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp147
4 files changed, 211 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp
index 7ec1c9c1ab6..79554928eb1 100644
--- a/ydb/core/tx/datashard/build_index.cpp
+++ b/ydb/core/tx/datashard/build_index.cpp
@@ -554,8 +554,31 @@ TAutoPtr<NTable::IScan> CreateBuildIndexScan(
buildIndexId, target, seqNo, dataShardId, datashardActorId, schemeshardActorId, range, targetIndexColumns, targetDataColumns, tableInfo, limits);
}
+class TDataShard::TTxHandleSafeBuildIndexScan : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+public:
+ TTxHandleSafeBuildIndexScan(TDataShard* self, TEvDataShard::TEvBuildIndexCreateRequest::TPtr&& ev)
+ : TTransactionBase(self)
+ , Ev(std::move(ev))
+ {}
+
+ bool Execute(TTransactionContext&, const TActorContext& ctx) {
+ Self->HandleSafe(Ev, ctx);
+ return true;
+ }
+
+ void Complete(const TActorContext&) {
+ // nothing
+ }
+
+private:
+ TEvDataShard::TEvBuildIndexCreateRequest::TPtr Ev;
+};
+
+void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext&) {
+ Execute(new TTxHandleSafeBuildIndexScan(this, std::move(ev)));
+}
-void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx) {
+void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
// Note: it's very unlikely that we have volatile txs before this snapshot
@@ -648,6 +671,12 @@ void TDataShard::Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, cons
return;
}
+ if (!IsStateActive()) {
+ badRequest(TStringBuilder() << "Shard " << TabletID() << " is not ready for requests");
+ ctx.Send(ev->Sender, std::move(response));
+ return;
+ }
+
TScanOptions scanOpts;
scanOpts.SetSnapshotRowVersion(TRowVersion(snapshotKey.Step, snapshotKey.TxId));
scanOpts.SetResourceBroker("build_index", 10);
diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
index 681c75ccf02..eeb95476c59 100644
--- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
@@ -535,7 +535,31 @@ private:
TOwnedCellVec LastKey;
};
+class TDataShard::TTxHandleSafeKqpScan : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+public:
+ TTxHandleSafeKqpScan(TDataShard* self, TEvDataShard::TEvKqpScan::TPtr&& ev)
+ : TTransactionBase(self)
+ , Ev(std::move(ev))
+ {}
+
+ bool Execute(TTransactionContext&, const TActorContext& ctx) {
+ Self->HandleSafe(Ev, ctx);
+ return true;
+ }
+
+ void Complete(const TActorContext&) {
+ // nothing
+ }
+
+private:
+ TEvDataShard::TEvKqpScan::TPtr Ev;
+};
+
void TDataShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext&) {
+ Execute(new TTxHandleSafeKqpScan(this, std::move(ev)));
+}
+
+void TDataShard::HandleSafe(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext&) {
auto& request = ev->Get()->Record;
auto scanComputeActor = ev->Sender;
auto generation = request.GetGeneration();
@@ -619,6 +643,12 @@ void TDataShard::Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext&
return;
}
+ if (!IsStateActive()) {
+ reportError(request.GetTablePath(), TStringBuilder() << "TxId: " << request.GetTxId() << "."
+ << " Shard " << TabletID() << " is not ready to process requests.");
+ return;
+ }
+
Pipeline.StartStreamingTx(snapshot.GetTxId(), 1);
TSmallVec<TSerializedTableRange> ranges;
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 617ec94b7ff..e20a188bddc 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -235,6 +235,9 @@ class TDataShard
class TTxReadViaPipeline;
class TReadOperation;
+ class TTxHandleSafeKqpScan;
+ class TTxHandleSafeBuildIndexScan;
+
ITransaction *CreateTxMonitoring(TDataShard *self,
NMon::TEvRemoteHttpInfo::TPtr ev);
ITransaction *CreateTxGetInfo(TDataShard *self,
@@ -1210,6 +1213,7 @@ class TDataShard
void Handle(TEvDataShard::TEvGetTableStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx);
+ void HandleSafe(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvConditionalEraseRowsRequest::TPtr& ev, const TActorContext& ctx);
@@ -1238,6 +1242,7 @@ class TDataShard
void Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx);
+ void HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
index b5d2a176ec0..271c35a3476 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
@@ -1,9 +1,11 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+#include <ydb/core/kqp/rm_service/kqp_snapshot_manager.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/datashard/datashard_ut_common_kqp.h>
#include <ydb/core/tx/datashard/datashard_ut_common.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/base/blobstorage.h>
namespace NKikimr {
namespace NKqp {
@@ -636,8 +638,151 @@ Y_UNIT_TEST_SUITE(KqpScan) {
UNIT_ASSERT_VALUES_EQUAL(incomingRangesSize, 3);
}
+ Y_UNIT_TEST(ScanAfterSplitSlowMetaRead) {
+ NKikimrConfig::TAppConfig appCfg;
+
+ auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
+ rm->SetChannelBufferSize(100);
+ rm->SetMinChannelBufferSize(100);
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetNodeCount(2)
+ .SetAppConfig(appCfg)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ EnableLogging(runtime);
+
+ InitRoot(server, sender);
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ ExecSQL(server, sender, FillTableQuery());
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ std::optional<int> result;
+ std::optional<Ydb::StatusIds::StatusCode> status;
+ auto streamSender = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+ Cerr << "... response " << ev->GetTypeRewrite() << " " << ev->GetTypeName() << " " << ev->ToString() << Endl;
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TEvKqpExecuter::TEvStreamData::EventType: {
+ auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>();
+ auto& record = msg->Record;
+ Y_ASSERT(record.GetResultSet().rows().size() == 1);
+ Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
+ result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(record.GetSeqNo());
+ resp->Record.SetFreeSpace(100);
+ ctx.Send(ev->Sender, resp.Release());
+ break;
+ }
+ case NKqp::TEvKqp::TEvQueryResponse::EventType: {
+ auto* msg = ev->Get<NKqp::TEvKqp::TEvQueryResponse>();
+ auto& record = msg->Record.GetRef();
+ status = record.GetYdbStatus();
+ break;
+ }
+ }
+ }));
+
+ SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, "SELECT sum(value) FROM `/Root/table-1`;", false));
+ waitFor([&]{ return bool(status); }, "request status");
+
+ UNIT_ASSERT_VALUES_EQUAL(*status, Ydb::StatusIds::SUCCESS);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 596400);
+
+ SetSplitMergePartCountLimit(&runtime, -1);
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+
+ TVector<THolder<IEventHandle>> blockedGets;
+ TVector<THolder<IEventHandle>> blockedSnapshots;
+ auto blockGetObserver = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TEvKqpSnapshot::TEvCreateSnapshotResponse::EventType: {
+ Cerr << "... blocking snapshot response" << Endl;
+ blockedSnapshots.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ case TEvBlobStorage::TEvGet::EventType: {
+ auto* msg = ev->Get<TEvBlobStorage::TEvGet>();
+ bool block = false;
+ for (ui32 i = 0; i < msg->QuerySize; ++i) {
+ if (msg->Queries[i].Id.TabletID() == shards.at(0)) {
+ Cerr << "... blocking get request to " << msg->Queries[i].Id << Endl;
+ block = true;
+ }
+ }
+ if (block) {
+ blockedGets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(blockGetObserver);
+
+ result = {};
+ status = {};
+ SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, "SELECT sum(value) FROM `/Root/table-1`;", false));
+
+ waitFor([&]{ return blockedSnapshots.size() > 0; }, "snapshot response");
+
+ // Start a split, surprisingly it will succeed despite blocked events
+ auto senderSplit = runtime.AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/table-1", shards.at(0), 55 /* splitKey */);
+ WaitTxNotification(server, senderSplit, txId);
+
+ // Unblock snapshot and try waiting for results
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : blockedSnapshots) {
+ ui32 nodeIdx = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0);
+ Cerr << "... unblocking snapshot" << Endl;
+ runtime.Send(ev.Release(), nodeIdx, true);
+ }
+ blockedSnapshots.clear();
+
+ SimulateSleep(runtime, TDuration::Seconds(1));
+ UNIT_ASSERT_C(!status, "Query finished with status: " << *status);
+ UNIT_ASSERT_C(!result, "Query returned unexpected result: " << *result);
+
+ // Unblock storage reads and wait for result
+ for (auto& ev : blockedGets) {
+ ui32 nodeIdx = ev->GetRecipientRewrite().NodeId() - runtime.GetNodeId(0);
+ Cerr << "... unblocking get" << Endl;
+ runtime.Send(ev.Release(), nodeIdx, true);
+ }
+ blockedGets.clear();
+
+ waitFor([&]{ return bool(status); }, "request finish");
+ UNIT_ASSERT_VALUES_EQUAL(*status, Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 596400);
+ }
+
}
} // namespace NKqp
} // namespace NKikimr
-