diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-30 14:23:44 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-30 14:23:44 +0300 |
commit | ab05acd455e3b82090e26fc2a5b94b61bd9812f1 (patch) | |
tree | 464de27c4275c2ec11fdaef329f72789946fd5d3 | |
parent | 000a0828a30804aaca971eb519441c85b85d51fb (diff) | |
download | ydb-ab05acd455e3b82090e26fc2a5b94b61bd9812f1.tar.gz |
add feature flag to enable new executor features KIKIMR-15067
ref:9f92b1d2d5893ea31a4d728164f418b569a0706e
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 68 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 |
3 files changed, 75 insertions, 1 deletions
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 263cd10011..189ca20da0 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -422,7 +422,12 @@ private: bool sorted) { ui64 nodeId = ShardIdToNodeId.at(shardId); - if (stageInfo.Meta.IsOlap() && sorted) { + bool supportsMultipleOlapShards = AppData()->FeatureFlags.GetEnableKqpScanQueryMultipleOlapShardsReads(); + if (sorted) { + supportsMultipleOlapShards = false; + } + + if (stageInfo.Meta.IsOlap() && !supportsMultipleOlapShards) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; return task; diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index dc1792acce..e3a2125196 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -1429,6 +1429,73 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST_TWIN(ManyColumnShards, UseSessionActor) { + TPortManager tp; + ui16 mbusport = tp.GetPort(2134); + auto settings = Tests::TServerSettings(mbusport) + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetNodeCount(2); + + Tests::TServer::TPtr server = new Tests::TServer(settings); + + server->GetRuntime()->GetAppData().FeatureFlags.SetEnableKqpScanQueryMultipleOlapShardsReads(true); + server->GetRuntime()->GetAppData().FeatureFlags.SetEnableOlapSchemaOperationsForTest(true); + + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + InitRoot(server, sender); + EnableDebugLogging(runtime); + + CreateTestOlapTable(*server, "largeOlapTable", "largeOlapStore", 1000, 1000); + ui32 insertRows = 0; + for(ui64 i = 0; i < 100; ++i) { + SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); + insertRows += 2000; + } + + ui64 result = 0; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + switch (ev->GetTypeRewrite()) { + case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { + + auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + for (auto& [shardId, nodeId]: msg->ShardNodes) { + Cerr << "-- nodeId: " << nodeId << Endl; + nodeId = runtime->GetNodeId(0); + } + break; + } + + case NKqp::TKqpExecuterEvents::EvStreamData: { + auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; + + Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl); + Cerr.Flush(); + + Y_ASSERT(record.GetResultSet().rows().size() == 1); + Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); + result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(100); + runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + + runtime->SetObserverFunc(captureEvents); + auto streamSender = runtime->AllocateEdgeActor(); + SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT COUNT(*) FROM `/Root/largeOlapStore/largeOlapTable`;", false)); + auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender); + UNIT_ASSERT_VALUES_EQUAL(result, insertRows); + } + Y_UNIT_TEST_TWIN(ManyColumnShardsWithRestarts, UseSessionActor) { // remove this return when bug with scan is fixed. // todo: KIKIMR-15200 @@ -1443,6 +1510,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Tests::TServer::TPtr server = new Tests::TServer(settings); + server->GetRuntime()->GetAppData().FeatureFlags.SetEnableKqpScanQueryMultipleOlapShardsReads(true); server->GetRuntime()->GetAppData().FeatureFlags.SetEnableOlapSchemaOperationsForTest(true); auto runtime = server->GetRuntime(); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 5212d8624b..c2d14559ff 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -700,6 +700,7 @@ message TFeatureFlags { optional bool EnableAsyncHttpMon = 64 [default = true]; optional bool EnableChangefeeds = 65 [default = true]; optional bool EnableKqpScanQueryStreamLookup = 66 [default = false]; + optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false]; } |