diff options
author | gvit <gvit@ydb.tech> | 2022-09-23 10:36:58 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-09-23 10:36:58 +0300 |
commit | ec4303976cef8f0b57309321bbedf913498e496c (patch) | |
tree | 2c10bdadd8613c9c72abef88b23a0ecc3a03aebf | |
parent | 68619df7fbbc0c6cc67e54879a977cc1790c8e1e (diff) | |
download | ydb-ec4303976cef8f0b57309321bbedf913498e496c.tar.gz |
populate input channels updates
-rw-r--r-- | ydb/core/kqp/executer/kqp_executer_impl.h | 19 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 90 |
2 files changed, 107 insertions, 2 deletions
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index 435c8d988f..9e43ae173d 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -244,7 +244,7 @@ protected: LOG_T("Collect channels updates for task: " << task.Id << " at actor " << task.ComputeActorId); - // N.B. update only output channels + auto& selfUpdates = updates[task.ComputeActorId]; for (auto& input : task.Inputs) { for (auto channelId : input.Channels) { @@ -255,6 +255,7 @@ protected: auto& srcTask = TasksGraph.GetTask(channel.SrcTask); if (srcTask.ComputeActorId) { updates[srcTask.ComputeActorId].emplace(channelId); + selfUpdates.emplace(channelId); } LOG_T("Task: " << task.Id << ", input channelId: " << channelId << ", src task: " << channel.SrcTask @@ -264,7 +265,21 @@ protected: for (auto& output : task.Outputs) { for (auto channelId : output.Channels) { - updates[task.ComputeActorId].emplace(channelId); + selfUpdates.emplace(channelId); + + auto& channel = TasksGraph.GetChannel(channelId); + YQL_ENSURE(channel.SrcTask == task.Id); + + if (channel.DstTask) { + auto& dstTask = TasksGraph.GetTask(channel.DstTask); + if (dstTask.ComputeActorId) { + // not a optimal solution + updates[dstTask.ComputeActorId].emplace(channelId); + } + + LOG_T("Task: " << task.Id << ", output channelId: " << channelId << ", dst task: " << channel.DstTask + << ", at actor " << dstTask.ComputeActorId); + } } } } diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index 6d01fbf436..1c290d31b4 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -7,6 +7,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/core/kqp/executer/kqp_executer.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/datashard/datashard_ut_common_kqp.h> @@ -2011,6 +2012,95 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST(SelectLimit1ManyShards) { + TPortManager tp; + ui16 mbusport = tp.GetPort(2134); + auto settings = Tests::TServerSettings(mbusport) + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetNodeCount(2); + + + Tests::TServer::TPtr server = new Tests::TServer(settings); + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + InitRoot(server, sender); + EnableDebugLogging(runtime); + + const ui32 numShards = 10; + const ui32 numIterations = 10; + CreateTestOlapTable(*server, "selectTable", "selectStore", numShards, numShards); + ui32 insertRows = 0; + for(ui64 i = 0; i < numIterations; ++i) { + SendDataViaActorSystem(runtime, "/Root/selectStore/selectTable", 0, 1000000 + i*1000000, 2000); + insertRows += 2000; + } + + ui64 result = 0; + + std::vector<TAutoPtr<IEventHandle>> evs; + ui32 num = 0; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + switch (ev->GetTypeRewrite()) { + case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { + + auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + for (auto& [shardId, nodeId]: msg->ShardNodes) { + Cerr << "-- nodeId: " << nodeId << Endl; + nodeId = runtime->GetNodeId(num); + ++num; + num = num % 2; + } + break; + } + + case NYql::NDq::TDqComputeEvents::EvChannelData: { + auto& record = ev->Get<NYql::NDq::TEvDqCompute::TEvChannelData>()->Record; + if (record.GetChannelData().GetChannelId() == 2) { + Cerr << (TStringBuilder() << "captured event for the second channel" << Endl); + Cerr.Flush(); + } + + Cerr << (TStringBuilder() << "-- EvChannelData: " << record.AsJSON() << Endl); + Cerr.Flush(); + + if (record.GetChannelData().GetChannelId() == 2) { + Cerr << (TStringBuilder() << "captured event for the second channel" << Endl); + evs.push_back(ev); + return TTestActorRuntime::EEventAction::DROP; + } else { + return TTestActorRuntime::EEventAction::PROCESS; + } + } + + case NKqp::TKqpExecuterEvents::EvStreamData: { + auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; + + Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl); + Cerr.Flush(); + + Y_ASSERT(record.GetResultSet().rows().size() == 1); + result = 1; + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(100); + runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + + runtime->SetObserverFunc(captureEvents); + auto streamSender = runtime->AllocateEdgeActor(); + SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT * FROM `/Root/selectStore/selectTable` LIMIT 1;", false)); + auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender); + UNIT_ASSERT_VALUES_EQUAL(result, 1); + } + Y_UNIT_TEST(ManyColumnShards) { TPortManager tp; ui16 mbusport = tp.GetPort(2134); |