aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-08-10 15:46:43 +0300
committergvit <gvit@ydb.tech>2023-08-10 17:02:59 +0300
commit7258a05c46620b534ab2ad6f8207dccfa042bcb3 (patch)
treef6b07312cb875816e574f799365956db116a1106
parent22521ed2e6ba07acd9c2d3f3634d936a5cf3cb6f (diff)
downloadydb-7258a05c46620b534ab2ad6f8207dccfa042bcb3.tar.gz
fix bug with LastKey handling KIKIMR-18887
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp116
2 files changed, 124 insertions, 0 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index 275a1c68726..de296ca58c0 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -367,6 +367,14 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
if (!state.LastKey.empty()) {
PendingShards.front().LastKey = std::move(state.LastKey);
+ while(!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) {
+ CA_LOG_D("Nothing to read " << PendingShards.front().ToString(KeyColumnTypes));
+ auto readShard = std::move(PendingShards.front());
+ PendingShards.pop_front();
+ PendingShards.front().LastKey = std::move(readShard.LastKey);
+ }
+
+ YQL_ENSURE(!PendingShards.empty());
}
if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
index 271c35a3476..971eb1476aa 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
@@ -245,6 +245,122 @@ Y_UNIT_TEST_SUITE(KqpScan) {
UNIT_ASSERT_VALUES_EQUAL(result, 596400);
}
+ Y_UNIT_TEST(ScanDuringSplit10) {
+ NKikimrConfig::TAppConfig appCfg;
+
+ auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
+ rm->SetChannelBufferSize(100);
+ rm->SetMinChannelBufferSize(100);
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetNodeCount(2)
+ .SetAppConfig(appCfg)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+ auto senderSplit = runtime.AllocateEdgeActor();
+
+ EnableLogging(runtime);
+
+ SetSplitMergePartCountLimit(&runtime, -1);
+
+ InitRoot(server, sender);
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ ExecSQL(server, sender, FillTableQuery());
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ for (const auto& shard: shards) {
+ Cerr << (TStringBuilder() << "-- shardId=" << shard << Endl);
+ Cerr.Flush();
+ }
+
+ TSet<TActorId> scans;
+ TActorId firstScanActor;
+ ui64 tabletId = 0;
+
+ ui64 result = 0;
+
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
+ auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ for (auto& [shardId, nodeId]: msg->ShardNodes) {
+ tabletId = shardId;
+ Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl);
+ Cerr.Flush();
+ }
+ break;
+ }
+
+ case TEvDataShard::EvKqpScan: {
+ Cerr << (TStringBuilder() << "-- EvScan " << ev->Sender << " -> " << ev->Recipient << Endl);
+ Cerr.Flush();
+ break;
+ }
+
+ /*
+ * Respond to streamData with acks. Without that execution pipeline will stop
+ * producing new tuples.
+ */
+ case NKqp::TKqpExecuterEvents::EvStreamData: {
+ auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
+
+ Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl);
+ Cerr.Flush();
+
+ Y_ASSERT(record.GetResultSet().rows().size() == 1);
+ Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
+ result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(100);
+
+ runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
+
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ /* Drop message and kill tablet if we already had seen this tablet */
+ case NKqp::TKqpComputeEvents::EvScanData: {
+ if (!firstScanActor) {
+ firstScanActor = ev->Sender;
+ AsyncSplitTable(server, senderSplit, "/Root/table-1", tabletId, 10 /* splitKey */);
+ Cerr << (TStringBuilder() << "-- EvScanData from old tablet " << ev->Sender << ": pass and split" << Endl);
+ Cerr.Flush();
+ } else if (firstScanActor == ev->Sender) {
+ // data from old table scan, drop it
+ Cerr << (TStringBuilder() << "-- EvScanData from old tablet " << ev->Sender << ": drop" << Endl);
+ Cerr.Flush();
+ return TTestActorRuntime::EEventAction::DROP;
+ } else {
+ // data from new tablet scan, pass it
+ Cerr << (TStringBuilder() << "-- EvScanData from new tablet" << ev->Sender << ": pass" << Endl);
+ Cerr.Flush();
+ }
+
+ break;
+ }
+
+ default:
+ break;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ runtime.SetObserverFunc(captureEvents);
+
+ auto streamSender = runtime.AllocateEdgeActor();
+ SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, "SELECT sum(value) FROM `/Root/table-1`;", false));
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender);
+
+ UNIT_ASSERT_VALUES_EQUAL(result, 596400);
+ }
+
Y_UNIT_TEST(ScanDuringSplitThenMerge) {
NKikimrConfig::TAppConfig appCfg;