diff options
author | gvit <gvit@ydb.tech> | 2023-08-10 15:46:43 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-08-10 17:02:59 +0300 |
commit | 7258a05c46620b534ab2ad6f8207dccfa042bcb3 (patch) | |
tree | f6b07312cb875816e574f799365956db116a1106 | |
parent | 22521ed2e6ba07acd9c2d3f3634d936a5cf3cb6f (diff) | |
download | ydb-7258a05c46620b534ab2ad6f8207dccfa042bcb3.tar.gz |
fix bug with LastKey handling KIKIMR-18887
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp | 116 |
2 files changed, 124 insertions, 0 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 275a1c68726..de296ca58c0 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -367,6 +367,14 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet if (!state.LastKey.empty()) { PendingShards.front().LastKey = std::move(state.LastKey); + while(!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) { + CA_LOG_D("Nothing to read " << PendingShards.front().ToString(KeyColumnTypes)); + auto readShard = std::move(PendingShards.front()); + PendingShards.pop_front(); + PendingShards.front().LastKey = std::move(readShard.LastKey); + } + + YQL_ENSURE(!PendingShards.empty()); } if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE) diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index 271c35a3476..971eb1476aa 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -245,6 +245,122 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT_VALUES_EQUAL(result, 596400); } + Y_UNIT_TEST(ScanDuringSplit10) { + NKikimrConfig::TAppConfig appCfg; + + auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); + rm->SetChannelBufferSize(100); + rm->SetMinChannelBufferSize(100); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetAppConfig(appCfg) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + auto senderSplit = runtime.AllocateEdgeActor(); + + EnableLogging(runtime); + + SetSplitMergePartCountLimit(&runtime, -1); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", 1); + ExecSQL(server, sender, FillTableQuery()); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + for (const auto& shard: shards) { + Cerr << (TStringBuilder() << "-- shardId=" << shard << Endl); + Cerr.Flush(); + } + + TSet<TActorId> scans; + TActorId firstScanActor; + ui64 tabletId = 0; + + ui64 result = 0; + + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) { + switch (ev->GetTypeRewrite()) { + case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { + auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + for (auto& [shardId, nodeId]: msg->ShardNodes) { + tabletId = shardId; + Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl); + Cerr.Flush(); + } + break; + } + + case TEvDataShard::EvKqpScan: { + Cerr << (TStringBuilder() << "-- EvScan " << ev->Sender << " -> " << ev->Recipient << Endl); + Cerr.Flush(); + break; + } + + /* + * Respond to streamData with acks. Without that execution pipeline will stop + * producing new tuples. + */ + case NKqp::TKqpExecuterEvents::EvStreamData: { + auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; + + Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl); + Cerr.Flush(); + + Y_ASSERT(record.GetResultSet().rows().size() == 1); + Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); + result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(100); + + runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); + + return TTestActorRuntime::EEventAction::DROP; + } + + /* Drop message and kill tablet if we already had seen this tablet */ + case NKqp::TKqpComputeEvents::EvScanData: { + if (!firstScanActor) { + firstScanActor = ev->Sender; + AsyncSplitTable(server, senderSplit, "/Root/table-1", tabletId, 10 /* splitKey */); + Cerr << (TStringBuilder() << "-- EvScanData from old tablet " << ev->Sender << ": pass and split" << Endl); + Cerr.Flush(); + } else if (firstScanActor == ev->Sender) { + // data from old table scan, drop it + Cerr << (TStringBuilder() << "-- EvScanData from old tablet " << ev->Sender << ": drop" << Endl); + Cerr.Flush(); + return TTestActorRuntime::EEventAction::DROP; + } else { + // data from new tablet scan, pass it + Cerr << (TStringBuilder() << "-- EvScanData from new tablet" << ev->Sender << ": pass" << Endl); + Cerr.Flush(); + } + + break; + } + + default: + break; + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(captureEvents); + + auto streamSender = runtime.AllocateEdgeActor(); + SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, "SELECT sum(value) FROM `/Root/table-1`;", false)); + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender); + + UNIT_ASSERT_VALUES_EQUAL(result, 596400); + } + Y_UNIT_TEST(ScanDuringSplitThenMerge) { NKikimrConfig::TAppConfig appCfg; |