diff options
| author | Daniil Cherednik <[email protected]> | 2024-09-13 16:04:08 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-09-13 16:04:08 +0200 |
| commit | f8a268370797dd82aa1a9c36b702970127f9c23a (patch) | |
| tree | 85c5a42c801f55cc2ecbb23148261c0c956d82f3 | |
| parent | 159be177cab8d7e985efab122c30bb4c03fcec78 (diff) | |
Make ShardsResolver separate kqp library (#9098)
| -rw-r--r-- | ydb/core/kqp/common/simple/kqp_event_ids.h | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 5 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 11 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 4 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 5 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.cpp (renamed from ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp) | 8 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h (renamed from ydb/core/kqp/executer_actor/kqp_shards_resolver.h) | 0 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h | 21 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/shards_resolver/ya.make | 15 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/ya.make | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 8 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp | 12 |
13 files changed, 57 insertions, 36 deletions
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 8c4b3fcab29..c927c0b1156 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -1,7 +1,6 @@ #pragma once #include <ydb/core/base/events.h> -#include <ydb/library/yql/dq/actors/dq_events_ids.h> namespace NKikimr { namespace NKqp { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 143d02d7157..d139ae51e15 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -5,7 +5,6 @@ #include "kqp_planner.h" #include "kqp_table_resolver.h" #include "kqp_tasks_validate.h" -#include "kqp_shards_resolver.h" #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipecache.h> @@ -299,7 +298,7 @@ public: try { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); - hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); + hFunc(NShardResolver::TEvShardsResolveStatus, HandleResolve); hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); hFunc(TEvSaveScriptExternalEffectResponse, HandleResolve); hFunc(TEvDescribeSecretsResponse, HandleResolve); @@ -2151,7 +2150,7 @@ private: DoExecute(); } - void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { + void HandleResolve(NShardResolver::TEvShardsResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) { return; } diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 713b56a989a..abd951df281 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -4,6 +4,7 @@ #include <ydb/core/kqp/common/kqp_tx.h> #include <ydb/core/kqp/common/kqp_event_ids.h> #include <ydb/core/kqp/common/kqp_user_request_context.h> +#include <ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h> #include <ydb/core/kqp/query_data/kqp_query_data.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/counters/kqp_counters.h> @@ -86,16 +87,6 @@ struct TEvKqpExecuter { NYql::TIssues Issues; TDuration CpuTime; }; - - struct TEvShardsResolveStatus : public TEventLocal<TEvShardsResolveStatus, - TKqpExecuterEvents::EvShardsResolveStatus> - { - Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::SUCCESS; - NYql::TIssues Issues; - - TMap<ui64, ui64> ShardNodes; - ui32 Unresolved = 0; - }; }; struct TKqpFederatedQuerySetup; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 7ca97673269..fc6a84c3f65 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -6,7 +6,6 @@ #include "kqp_partition_helper.h" #include "kqp_result_channel.h" #include "kqp_table_resolver.h" -#include "kqp_shards_resolver.h" #include <ydb/core/kqp/common/kqp_ru_calc.h> #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> @@ -19,6 +18,7 @@ #include <ydb/library/wilson_ids/wilson.h> #include <ydb/library/ydb_issue/issue_helpers.h> #include <ydb/core/kqp/executer_actor/kqp_tasks_graph.h> +#include <ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/common/kqp_yql.h> @@ -210,7 +210,7 @@ protected: } [[nodiscard]] - bool HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { + bool HandleResolve(NShardResolver::TEvShardsResolveStatus::TPtr& ev) { auto& reply = *ev->Get(); KqpShardsResolverId = {}; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 876c704e0a5..8115e81459a 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -1,7 +1,6 @@ #include "kqp_executer_stats.h" #include "kqp_planner.h" #include "kqp_planner_strategy.h" -#include "kqp_shards_resolver.h" #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/base/appdata.h> diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 28a17b9a5ef..a42f1234a31 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -3,7 +3,6 @@ #include "kqp_partition_helper.h" #include "kqp_tasks_graph.h" #include "kqp_tasks_validate.h" -#include "kqp_shards_resolver.h" #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipecache.h> @@ -80,7 +79,7 @@ public: try { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); - hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); + hFunc(NShardResolver::TEvShardsResolveStatus, HandleResolve); hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); default: @@ -153,7 +152,7 @@ private: } } - void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { + void HandleResolve(NShardResolver::TEvShardsResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; GetResourcesSnapshot(); } diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.cpp index 7b20b791436..de6705e2d40 100644 --- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp +++ b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.cpp @@ -1,8 +1,7 @@ #include "kqp_shards_resolver.h" +#include "kqp_shards_resolver_events.h" #include <ydb/core/base/tablet_pipecache.h> -#include <ydb/core/kqp/executer_actor/kqp_executer.h> -#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/hfunc.h> @@ -87,7 +86,6 @@ private: LOG_W(reply); ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, std::move(reply)); return; - } ++retryCount; @@ -95,7 +93,7 @@ private: } void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, TString&& message) { - auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>(); + auto replyEv = std::make_unique<NShardResolver::TEvShardsResolveStatus>(); replyEv->Status = status; replyEv->Issues.AddIssue(TIssue(message)); Send(Owner, replyEv.release()); @@ -103,7 +101,7 @@ private: } void ReplyAndDie() { - auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>(); + auto replyEv = std::make_unique<NShardResolver::TEvShardsResolveStatus>(); replyEv->ShardNodes = std::move(Result); Send(Owner, replyEv.release()); PassAway(); diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h index 7883c89e08e..7883c89e08e 100644 --- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h +++ b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver.h diff --git a/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h new file mode 100644 index 00000000000..972eb507d2f --- /dev/null +++ b/ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h @@ -0,0 +1,21 @@ +#pragma once + +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <ydb/core/kqp/common/simple/kqp_event_ids.h> + +namespace NKikimr::NKqp { + +namespace NShardResolver { + struct TEvShardsResolveStatus : public TEventLocal<TEvShardsResolveStatus, + TKqpExecuterEvents::EvShardsResolveStatus> + { + Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::SUCCESS; + NYql::TIssues Issues; + + TMap<ui64, ui64> ShardNodes; + ui32 Unresolved = 0; + }; +}; + +} diff --git a/ydb/core/kqp/executer_actor/shards_resolver/ya.make b/ydb/core/kqp/executer_actor/shards_resolver/ya.make new file mode 100644 index 00000000000..25ca5860cd0 --- /dev/null +++ b/ydb/core/kqp/executer_actor/shards_resolver/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + kqp_shards_resolver.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/core/actorlib_impl + ydb/core/base +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/kqp/executer_actor/ya.make b/ydb/core/kqp/executer_actor/ya.make index 9f8be9bb9fe..8b6a6f5119a 100644 --- a/ydb/core/kqp/executer_actor/ya.make +++ b/ydb/core/kqp/executer_actor/ya.make @@ -11,7 +11,6 @@ SRCS( kqp_partition_helper.cpp kqp_planner.cpp kqp_planner_strategy.cpp - kqp_shards_resolver.cpp kqp_result_channel.cpp kqp_table_resolver.cpp kqp_tasks_graph.cpp @@ -27,6 +26,7 @@ PEERDIR( ydb/core/formats ydb/core/kqp/common ydb/core/kqp/compute_actor + ydb/core/kqp/executer_actor/shards_resolver ydb/core/kqp/federated_query ydb/core/kqp/query_compiler ydb/core/kqp/rm_service diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 635efa2c00b..099b664aecd 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1366,7 +1366,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { switch (ev->GetTypeRewrite()) { case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { Cerr << "-- nodeId: " << nodeId << Endl; nodeId = runtime->GetNodeId(num); @@ -1454,7 +1454,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { switch (ev->GetTypeRewrite()) { case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { Cerr << "-- nodeId: " << nodeId << Endl; nodeId = runtime->GetNodeId(0); @@ -1518,7 +1518,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { switch (ev->GetTypeRewrite()) { case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { Cerr << "-- nodeId: " << nodeId << Endl; nodeId = runtime->GetNodeId(0); @@ -1588,7 +1588,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { switch (ev->GetTypeRewrite()) { case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { Cerr << "-- nodeId: " << nodeId << Endl; nodeId = runtime->GetNodeId(0); diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index 40961b9025b..32bbaf54f5a 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { * Trick executor to think that all datashard are located on node 1. */ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { Cerr << "-- nodeId: " << nodeId << Endl; Cerr.Flush(); @@ -193,7 +193,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { * Trick executor to think that all datashard are located on node 1. */ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { nodeId = firstNodeId; } @@ -287,7 +287,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto captureEvents = [&](TAutoPtr<IEventHandle> &ev) { switch (ev->GetTypeRewrite()) { case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { tabletId = shardId; Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl); @@ -405,7 +405,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto captureEvents = [&](TAutoPtr<IEventHandle> &ev) { switch (ev->GetTypeRewrite()) { case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { tabletId = shardId; Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl); @@ -535,7 +535,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto captureEvents = [&](TAutoPtr<IEventHandle> &ev) { switch (ev->GetTypeRewrite()) { case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { tabletId = shardId; Cerr << (TStringBuilder() << "-- tabletId= " << tabletId << Endl); @@ -647,7 +647,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { * Trick executor to think that all datashard are located on node 1. */ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { - auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + auto* msg = ev->Get<NKqp::NShardResolver::TEvShardsResolveStatus>(); for (auto& [shardId, nodeId]: msg->ShardNodes) { Cerr << "-- nodeId: " << nodeId << Endl; nodeId = runtime.GetNodeId(0); |
