aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-30 14:23:44 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-30 14:23:44 +0300
commitab05acd455e3b82090e26fc2a5b94b61bd9812f1 (patch)
tree464de27c4275c2ec11fdaef329f72789946fd5d3
parent000a0828a30804aaca971eb519441c85b85d51fb (diff)
downloadydb-ab05acd455e3b82090e26fc2a5b94b61bd9812f1.tar.gz
add feature flag to enable new executor features KIKIMR-15067
ref:9f92b1d2d5893ea31a4d728164f418b569a0706e
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp7
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp68
-rw-r--r--ydb/core/protos/config.proto1
3 files changed, 75 insertions, 1 deletions
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 263cd10011..189ca20da0 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -422,7 +422,12 @@ private:
bool sorted)
{
ui64 nodeId = ShardIdToNodeId.at(shardId);
- if (stageInfo.Meta.IsOlap() && sorted) {
+ bool supportsMultipleOlapShards = AppData()->FeatureFlags.GetEnableKqpScanQueryMultipleOlapShardsReads();
+ if (sorted) {
+ supportsMultipleOlapShards = false;
+ }
+
+ if (stageInfo.Meta.IsOlap() && !supportsMultipleOlapShards) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
return task;
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index dc1792acce..e3a2125196 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -1429,6 +1429,73 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST_TWIN(ManyColumnShards, UseSessionActor) {
+ TPortManager tp;
+ ui16 mbusport = tp.GetPort(2134);
+ auto settings = Tests::TServerSettings(mbusport)
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetNodeCount(2);
+
+ Tests::TServer::TPtr server = new Tests::TServer(settings);
+
+ server->GetRuntime()->GetAppData().FeatureFlags.SetEnableKqpScanQueryMultipleOlapShardsReads(true);
+ server->GetRuntime()->GetAppData().FeatureFlags.SetEnableOlapSchemaOperationsForTest(true);
+
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+
+ InitRoot(server, sender);
+ EnableDebugLogging(runtime);
+
+ CreateTestOlapTable(*server, "largeOlapTable", "largeOlapStore", 1000, 1000);
+ ui32 insertRows = 0;
+ for(ui64 i = 0; i < 100; ++i) {
+ SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
+ insertRows += 2000;
+ }
+
+ ui64 result = 0;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
+
+ auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ for (auto& [shardId, nodeId]: msg->ShardNodes) {
+ Cerr << "-- nodeId: " << nodeId << Endl;
+ nodeId = runtime->GetNodeId(0);
+ }
+ break;
+ }
+
+ case NKqp::TKqpExecuterEvents::EvStreamData: {
+ auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
+
+ Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl);
+ Cerr.Flush();
+
+ Y_ASSERT(record.GetResultSet().rows().size() == 1);
+ Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
+ result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(100);
+ runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ runtime->SetObserverFunc(captureEvents);
+ auto streamSender = runtime->AllocateEdgeActor();
+ SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT COUNT(*) FROM `/Root/largeOlapStore/largeOlapTable`;", false));
+ auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender);
+ UNIT_ASSERT_VALUES_EQUAL(result, insertRows);
+ }
+
Y_UNIT_TEST_TWIN(ManyColumnShardsWithRestarts, UseSessionActor) {
// remove this return when bug with scan is fixed.
// todo: KIKIMR-15200
@@ -1443,6 +1510,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Tests::TServer::TPtr server = new Tests::TServer(settings);
+ server->GetRuntime()->GetAppData().FeatureFlags.SetEnableKqpScanQueryMultipleOlapShardsReads(true);
server->GetRuntime()->GetAppData().FeatureFlags.SetEnableOlapSchemaOperationsForTest(true);
auto runtime = server->GetRuntime();
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 5212d8624b..c2d14559ff 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -700,6 +700,7 @@ message TFeatureFlags {
optional bool EnableAsyncHttpMon = 64 [default = true];
optional bool EnableChangefeeds = 65 [default = true];
optional bool EnableKqpScanQueryStreamLookup = 66 [default = false];
+ optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false];
}