summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.cpp (renamed from ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp)8
-rw-r--r--ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h (renamed from ydb/core/kqp/executer_actor/kqp_shards_resolver.h)0
-rw-r--r--ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h21
-rw-r--r--ydb/core/kqp/executer_actor/shards_resolver/ya.make15
-rw-r--r--ydb/core/kqp/executer_actor/ya.make2
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp12
13 files changed, 57 insertions, 36 deletions
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index 8c4b3fcab29..c927c0b1156 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -1,7 +1,6 @@
#pragma once
#include <ydb/core/base/events.h>
-#include <ydb/library/yql/dq/actors/dq_events_ids.h>
namespace NKikimr {
namespace NKqp {
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 143d02d7157..d139ae51e15 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -5,7 +5,6 @@
#include "kqp_planner.h"
#include "kqp_table_resolver.h"
#include "kqp_tasks_validate.h"
-#include "kqp_shards_resolver.h"
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipecache.h>
@@ -299,7 +298,7 @@ public:
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
- hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
+ hFunc(NShardResolver::TEvShardsResolveStatus, HandleResolve);
hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
hFunc(TEvSaveScriptExternalEffectResponse, HandleResolve);
hFunc(TEvDescribeSecretsResponse, HandleResolve);
@@ -2151,7 +2150,7 @@ private:
DoExecute();
}
- void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
+ void HandleResolve(NShardResolver::TEvShardsResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) {
return;
}
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 713b56a989a..abd951df281 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -4,6 +4,7 @@
#include <ydb/core/kqp/common/kqp_tx.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/core/kqp/common/kqp_user_request_context.h>
+#include <ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h>
#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -86,16 +87,6 @@ struct TEvKqpExecuter {
NYql::TIssues Issues;
TDuration CpuTime;
};
-
- struct TEvShardsResolveStatus : public TEventLocal<TEvShardsResolveStatus,
- TKqpExecuterEvents::EvShardsResolveStatus>
- {
- Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::SUCCESS;
- NYql::TIssues Issues;
-
- TMap<ui64, ui64> ShardNodes;
- ui32 Unresolved = 0;
- };
};
struct TKqpFederatedQuerySetup;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 7ca97673269..fc6a84c3f65 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -6,7 +6,6 @@
#include "kqp_partition_helper.h"
#include "kqp_result_channel.h"
#include "kqp_table_resolver.h"
-#include "kqp_shards_resolver.h"
#include <ydb/core/kqp/common/kqp_ru_calc.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
@@ -19,6 +18,7 @@
#include <ydb/library/wilson_ids/wilson.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/kqp/executer_actor/kqp_tasks_graph.h>
+#include <ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/common/kqp_yql.h>
@@ -210,7 +210,7 @@ protected:
}
[[nodiscard]]
- bool HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
+ bool HandleResolve(NShardResolver::TEvShardsResolveStatus::TPtr& ev) {
auto& reply = *ev->Get();
KqpShardsResolverId = {};
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 876c704e0a5..8115e81459a 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -1,7 +1,6 @@
#include "kqp_executer_stats.h"
#include "kqp_planner.h"
#include "kqp_planner_strategy.h"
-#include "kqp_shards_resolver.h"
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/base/appdata.h>
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 28a17b9a5ef..a42f1234a31 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -3,7 +3,6 @@
#include "kqp_partition_helper.h"
#include "kqp_tasks_graph.h"
#include "kqp_tasks_validate.h"
-#include "kqp_shards_resolver.h"
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipecache.h>
@@ -80,7 +79,7 @@ public:
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
- hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
+ hFunc(NShardResolver::TEvShardsResolveStatus, HandleResolve);
hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
default:
@@ -153,7 +152,7 @@ private:
}
}
- void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
+ void HandleResolve(NShardResolver::TEvShardsResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) return;
GetResourcesSnapshot();
}
diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.cpp
index 7b20b791436..de6705e2d40 100644
--- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp
+++ b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.cpp
@@ -1,8 +1,7 @@
#include "kqp_shards_resolver.h"
+#include "kqp_shards_resolver_events.h"
#include <ydb/core/base/tablet_pipecache.h>
-#include <ydb/core/kqp/executer_actor/kqp_executer.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
@@ -87,7 +86,6 @@ private:
LOG_W(reply);
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, std::move(reply));
return;
-
}
++retryCount;
@@ -95,7 +93,7 @@ private:
}
void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, TString&& message) {
- auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto replyEv = std::make_unique<NShardResolver::TEvShardsResolveStatus>();
replyEv->Status = status;
replyEv->Issues.AddIssue(TIssue(message));
Send(Owner, replyEv.release());
@@ -103,7 +101,7 @@ private:
}
void ReplyAndDie() {
- auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto replyEv = std::make_unique<NShardResolver::TEvShardsResolveStatus>();
replyEv->ShardNodes = std::move(Result);
Send(Owner, replyEv.release());
PassAway();
diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h
index 7883c89e08e..7883c89e08e 100644
--- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h
+++ b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h
diff --git a/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h
new file mode 100644
index 00000000000..972eb507d2f
--- /dev/null
+++ b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
+
+namespace NKikimr::NKqp {
+
+namespace NShardResolver {
+ struct TEvShardsResolveStatus : public TEventLocal<TEvShardsResolveStatus,
+ TKqpExecuterEvents::EvShardsResolveStatus>
+ {
+ Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::SUCCESS;
+ NYql::TIssues Issues;
+
+ TMap<ui64, ui64> ShardNodes;
+ ui32 Unresolved = 0;
+ };
+};
+
+}
diff --git a/ydb/core/kqp/executer_actor/shards_resolver/ya.make b/ydb/core/kqp/executer_actor/shards_resolver/ya.make
new file mode 100644
index 00000000000..25ca5860cd0
--- /dev/null
+++ b/ydb/core/kqp/executer_actor/shards_resolver/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ kqp_shards_resolver.cpp
+)
+
+PEERDIR(
+ ydb/library/actors/core
+ ydb/core/actorlib_impl
+ ydb/core/base
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/kqp/executer_actor/ya.make b/ydb/core/kqp/executer_actor/ya.make
index 9f8be9bb9fe..8b6a6f5119a 100644
--- a/ydb/core/kqp/executer_actor/ya.make
+++ b/ydb/core/kqp/executer_actor/ya.make
@@ -11,7 +11,6 @@ SRCS(
kqp_partition_helper.cpp
kqp_planner.cpp
kqp_planner_strategy.cpp
- kqp_shards_resolver.cpp
kqp_result_channel.cpp
kqp_table_resolver.cpp
kqp_tasks_graph.cpp
@@ -27,6 +26,7 @@ PEERDIR(
ydb/core/formats
ydb/core/kqp/common
ydb/core/kqp/compute_actor
+ ydb/core/kqp/executer_actor/shards_resolver
ydb/core/kqp/federated_query
ydb/core/kqp/query_compiler
ydb/core/kqp/rm_service
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 635efa2c00b..099b664aecd 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1366,7 +1366,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
switch (ev->GetTypeRewrite()) {
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
Cerr << "-- nodeId: " << nodeId << Endl;
nodeId = runtime->GetNodeId(num);
@@ -1454,7 +1454,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
switch (ev->GetTypeRewrite()) {
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
Cerr << "-- nodeId: " << nodeId << Endl;
nodeId = runtime->GetNodeId(0);
@@ -1518,7 +1518,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
switch (ev->GetTypeRewrite()) {
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
Cerr << "-- nodeId: " << nodeId << Endl;
nodeId = runtime->GetNodeId(0);
@@ -1588,7 +1588,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
switch (ev->GetTypeRewrite()) {
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
Cerr << "-- nodeId: " << nodeId << Endl;
nodeId = runtime->GetNodeId(0);
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
index 40961b9025b..32bbaf54f5a 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
@@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
* Trick executor to think that all datashard are located on node 1.
*/
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
Cerr << "-- nodeId: " << nodeId << Endl;
Cerr.Flush();
@@ -193,7 +193,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
* Trick executor to think that all datashard are located on node 1.
*/
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
nodeId = firstNodeId;
}
@@ -287,7 +287,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto captureEvents = [&](TAutoPtr<IEventHandle> &ev) {
switch (ev->GetTypeRewrite()) {
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
tabletId = shardId;
Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl);
@@ -405,7 +405,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto captureEvents = [&](TAutoPtr<IEventHandle> &ev) {
switch (ev->GetTypeRewrite()) {
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
tabletId = shardId;
Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl);
@@ -535,7 +535,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto captureEvents = [&](TAutoPtr<IEventHandle> &ev) {
switch (ev->GetTypeRewrite()) {
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
tabletId = shardId;
Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl);
@@ -647,7 +647,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
* Trick executor to think that all datashard are located on node 1.
*/
case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
- auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>();
for (auto& [shardId, nodeId]: msg->ShardNodes) {
Cerr << "-- nodeId: " << nodeId << Endl;
nodeId = runtime.GetNodeId(0);