aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-09-23 10:36:58 +0300
committergvit <gvit@ydb.tech>2022-09-23 10:36:58 +0300
commitec4303976cef8f0b57309321bbedf913498e496c (patch)
tree2c10bdadd8613c9c72abef88b23a0ecc3a03aebf
parent68619df7fbbc0c6cc67e54879a977cc1790c8e1e (diff)
downloadydb-ec4303976cef8f0b57309321bbedf913498e496c.tar.gz
populate input channels updates
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h19
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp90
2 files changed, 107 insertions, 2 deletions
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index 435c8d988f..9e43ae173d 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -244,7 +244,7 @@ protected:
LOG_T("Collect channels updates for task: " << task.Id << " at actor " << task.ComputeActorId);
- // N.B. update only output channels
+ auto& selfUpdates = updates[task.ComputeActorId];
for (auto& input : task.Inputs) {
for (auto channelId : input.Channels) {
@@ -255,6 +255,7 @@ protected:
auto& srcTask = TasksGraph.GetTask(channel.SrcTask);
if (srcTask.ComputeActorId) {
updates[srcTask.ComputeActorId].emplace(channelId);
+ selfUpdates.emplace(channelId);
}
LOG_T("Task: " << task.Id << ", input channelId: " << channelId << ", src task: " << channel.SrcTask
@@ -264,7 +265,21 @@ protected:
for (auto& output : task.Outputs) {
for (auto channelId : output.Channels) {
- updates[task.ComputeActorId].emplace(channelId);
+ selfUpdates.emplace(channelId);
+
+ auto& channel = TasksGraph.GetChannel(channelId);
+ YQL_ENSURE(channel.SrcTask == task.Id);
+
+ if (channel.DstTask) {
+ auto& dstTask = TasksGraph.GetTask(channel.DstTask);
+ if (dstTask.ComputeActorId) {
+ // not a optimal solution
+ updates[dstTask.ComputeActorId].emplace(channelId);
+ }
+
+ LOG_T("Task: " << task.Id << ", output channelId: " << channelId << ", dst task: " << channel.DstTask
+ << ", at actor " << dstTask.ComputeActorId);
+ }
}
}
}
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 6d01fbf436..1c290d31b4 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -7,6 +7,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/core/kqp/executer/kqp_executer.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/datashard/datashard_ut_common_kqp.h>
@@ -2011,6 +2012,95 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST(SelectLimit1ManyShards) {
+ TPortManager tp;
+ ui16 mbusport = tp.GetPort(2134);
+ auto settings = Tests::TServerSettings(mbusport)
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetNodeCount(2);
+
+
+ Tests::TServer::TPtr server = new Tests::TServer(settings);
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+
+ InitRoot(server, sender);
+ EnableDebugLogging(runtime);
+
+ const ui32 numShards = 10;
+ const ui32 numIterations = 10;
+ CreateTestOlapTable(*server, "selectTable", "selectStore", numShards, numShards);
+ ui32 insertRows = 0;
+ for(ui64 i = 0; i < numIterations; ++i) {
+ SendDataViaActorSystem(runtime, "/Root/selectStore/selectTable", 0, 1000000 + i*1000000, 2000);
+ insertRows += 2000;
+ }
+
+ ui64 result = 0;
+
+ std::vector<TAutoPtr<IEventHandle>> evs;
+ ui32 num = 0;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
+
+ auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ for (auto& [shardId, nodeId]: msg->ShardNodes) {
+ Cerr << "-- nodeId: " << nodeId << Endl;
+ nodeId = runtime->GetNodeId(num);
+ ++num;
+ num = num % 2;
+ }
+ break;
+ }
+
+ case NYql::NDq::TDqComputeEvents::EvChannelData: {
+ auto& record = ev->Get<NYql::NDq::TEvDqCompute::TEvChannelData>()->Record;
+ if (record.GetChannelData().GetChannelId() == 2) {
+ Cerr << (TStringBuilder() << "captured event for the second channel" << Endl);
+ Cerr.Flush();
+ }
+
+ Cerr << (TStringBuilder() << "-- EvChannelData: " << record.AsJSON() << Endl);
+ Cerr.Flush();
+
+ if (record.GetChannelData().GetChannelId() == 2) {
+ Cerr << (TStringBuilder() << "captured event for the second channel" << Endl);
+ evs.push_back(ev);
+ return TTestActorRuntime::EEventAction::DROP;
+ } else {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ }
+
+ case NKqp::TKqpExecuterEvents::EvStreamData: {
+ auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
+
+ Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl);
+ Cerr.Flush();
+
+ Y_ASSERT(record.GetResultSet().rows().size() == 1);
+ result = 1;
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(100);
+ runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ runtime->SetObserverFunc(captureEvents);
+ auto streamSender = runtime->AllocateEdgeActor();
+ SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT * FROM `/Root/selectStore/selectTable` LIMIT 1;", false));
+ auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender);
+ UNIT_ASSERT_VALUES_EQUAL(result, 1);
+ }
+
Y_UNIT_TEST(ManyColumnShards) {
TPortManager tp;
ui16 mbusport = tp.GetPort(2134);