aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-04 16:14:13 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-04 16:14:13 +0300
commit6b1bed39e996ecde5d9637b97612d7c4f36467f3 (patch)
tree57f784dbc9386690f4811df668a6fd0f81e53e05
parent3b3c4be949271feb82f21388e31fb56cb7666062 (diff)
downloadydb-6b1bed39e996ecde5d9637b97612d7c4f36467f3.tar.gz
split kqp common
-rw-r--r--ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/common/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/common/compilation/events.cpp5
-rw-r--r--ydb/core/kqp/common/compilation/events.h82
-rw-r--r--ydb/core/kqp/common/compilation/result.cpp1
-rw-r--r--ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt10
-rw-r--r--ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt10
-rw-r--r--ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt10
-rw-r--r--ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt10
-rw-r--r--ydb/core/kqp/common/events/events.cpp5
-rw-r--r--ydb/core/kqp/common/events/events.h124
-rw-r--r--ydb/core/kqp/common/events/process_response.h2
-rw-r--r--ydb/core/kqp/common/events/query.cpp5
-rw-r--r--ydb/core/kqp/common/events/query.h388
-rw-r--r--ydb/core/kqp/common/kqp.cpp40
-rw-r--r--ydb/core/kqp/common/kqp.h635
-rw-r--r--ydb/core/kqp/common/kqp_event_ids.h2
-rw-r--r--ydb/core/kqp/common/shutdown/CMakeLists.darwin-x86_64.txt24
-rw-r--r--ydb/core/kqp/common/shutdown/CMakeLists.linux-aarch64.txt25
-rw-r--r--ydb/core/kqp/common/shutdown/CMakeLists.linux-x86_64.txt25
-rw-r--r--ydb/core/kqp/common/shutdown/CMakeLists.txt17
-rw-r--r--ydb/core/kqp/common/shutdown/CMakeLists.windows-x86_64.txt24
-rw-r--r--ydb/core/kqp/common/shutdown/controller.cpp45
-rw-r--r--ydb/core/kqp/common/shutdown/controller.h26
-rw-r--r--ydb/core/kqp/common/shutdown/events.cpp5
-rw-r--r--ydb/core/kqp/common/shutdown/events.h28
-rw-r--r--ydb/core/kqp/common/shutdown/state.cpp17
-rw-r--r--ydb/core/kqp/common/shutdown/state.h36
-rw-r--r--ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.cpp (renamed from ydb/core/kqp/common/events/kqp_event_ids.cpp)0
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h (renamed from ydb/core/kqp/common/events/kqp_event_ids.h)0
-rw-r--r--ydb/core/kqp/common/simple/services.cpp5
-rw-r--r--ydb/core/kqp/common/simple/services.h39
42 files changed, 996 insertions, 677 deletions
diff --git a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
index 35587b33802..c40b6b7675a 100644
--- a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compilation)
add_subdirectory(events)
+add_subdirectory(shutdown)
add_subdirectory(simple)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -35,6 +36,7 @@ target_link_libraries(core-kqp-common PUBLIC
kqp-common-simple
kqp-common-compilation
kqp-common-events
+ kqp-common-shutdown
core-kqp-provider
tx-long_tx_service-public
core-tx-sharding
diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
index a680d797473..c469afdc8b0 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compilation)
add_subdirectory(events)
+add_subdirectory(shutdown)
add_subdirectory(simple)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -36,6 +37,7 @@ target_link_libraries(core-kqp-common PUBLIC
kqp-common-simple
kqp-common-compilation
kqp-common-events
+ kqp-common-shutdown
core-kqp-provider
tx-long_tx_service-public
core-tx-sharding
diff --git a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
index a680d797473..c469afdc8b0 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compilation)
add_subdirectory(events)
+add_subdirectory(shutdown)
add_subdirectory(simple)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -36,6 +37,7 @@ target_link_libraries(core-kqp-common PUBLIC
kqp-common-simple
kqp-common-compilation
kqp-common-events
+ kqp-common-shutdown
core-kqp-provider
tx-long_tx_service-public
core-tx-sharding
diff --git a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
index 35587b33802..c40b6b7675a 100644
--- a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compilation)
add_subdirectory(events)
+add_subdirectory(shutdown)
add_subdirectory(simple)
get_built_tool_path(
TOOL_enum_parser_bin
@@ -35,6 +36,7 @@ target_link_libraries(core-kqp-common PUBLIC
kqp-common-simple
kqp-common-compilation
kqp-common-events
+ kqp-common-shutdown
core-kqp-provider
tx-long_tx_service-public
core-tx-sharding
diff --git a/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt
index 23acfee25cc..f959492fd80 100644
--- a/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt
@@ -20,4 +20,5 @@ target_link_libraries(kqp-common-compilation PUBLIC
)
target_sources(kqp-common-compilation PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp
)
diff --git a/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt
index 905a8b9a14c..8145ada4823 100644
--- a/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt
@@ -21,4 +21,5 @@ target_link_libraries(kqp-common-compilation PUBLIC
)
target_sources(kqp-common-compilation PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp
)
diff --git a/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt
index 905a8b9a14c..8145ada4823 100644
--- a/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt
@@ -21,4 +21,5 @@ target_link_libraries(kqp-common-compilation PUBLIC
)
target_sources(kqp-common-compilation PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp
)
diff --git a/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt
index 23acfee25cc..f959492fd80 100644
--- a/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt
@@ -20,4 +20,5 @@ target_link_libraries(kqp-common-compilation PUBLIC
)
target_sources(kqp-common-compilation PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp
)
diff --git a/ydb/core/kqp/common/compilation/events.cpp b/ydb/core/kqp/common/compilation/events.cpp
new file mode 100644
index 00000000000..4a55f64db54
--- /dev/null
+++ b/ydb/core/kqp/common/compilation/events.cpp
@@ -0,0 +1,5 @@
+#include "events.h"
+
+namespace NKikimr::NKqp {
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h
new file mode 100644
index 00000000000..c9678caecb1
--- /dev/null
+++ b/ydb/core/kqp/common/compilation/events.h
@@ -0,0 +1,82 @@
+#pragma once
+#include "result.h"
+
+#include <library/cpp/actors/core/event_local.h>
+#include <ydb/library/aclib/aclib.h>
+#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
+#include <ydb/core/kqp/common/simple/query_id.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
+namespace NKikimr::NKqp::NPrivateEvents {
+
+struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
+ TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query,
+ bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
+ : UserToken(userToken)
+ , Uid(uid)
+ , Query(std::move(query))
+ , KeepInCache(keepInCache)
+ , Deadline(deadline)
+ , DbCounters(dbCounters)
+ , Orbit(std::move(orbit)) {
+ Y_ENSURE(Uid.Defined() != Query.Defined());
+ }
+
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ TMaybe<TString> Uid;
+ TMaybe<TKqpQueryId> Query;
+ bool KeepInCache = false;
+ // it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
+ TInstant Deadline;
+ TKqpDbCountersPtr DbCounters;
+ TMaybe<bool> DocumentApiRestricted;
+
+ NLWTrace::TOrbit Orbit;
+};
+
+struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
+ TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query,
+ TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
+ : UserToken(userToken)
+ , Uid(uid)
+ , Query(query)
+ , Deadline(deadline)
+ , DbCounters(dbCounters)
+ , Orbit(std::move(orbit)) {
+ }
+
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ TString Uid;
+ TMaybe<TKqpQueryId> Query;
+
+ TInstant Deadline;
+ TKqpDbCountersPtr DbCounters;
+
+ NLWTrace::TOrbit Orbit;
+};
+
+struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
+ TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {})
+ : CompileResult(compileResult)
+ , Orbit(std::move(orbit)) {
+ }
+
+ TKqpCompileResult::TConstPtr CompileResult;
+ NKqpProto::TKqpStatsCompile Stats;
+ std::optional<TString> ReplayMessage;
+
+ NLWTrace::TOrbit Orbit;
+};
+
+struct TEvCompileInvalidateRequest: public TEventLocal<TEvCompileInvalidateRequest,
+ TKqpEvents::EvCompileInvalidateRequest> {
+ TEvCompileInvalidateRequest(const TString& uid, TKqpDbCountersPtr dbCounters)
+ : Uid(uid)
+ , DbCounters(dbCounters) {
+ }
+
+ TString Uid;
+ TKqpDbCountersPtr DbCounters;
+};
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/compilation/result.cpp b/ydb/core/kqp/common/compilation/result.cpp
index 138843cf36b..292cd0d03c4 100644
--- a/ydb/core/kqp/common/compilation/result.cpp
+++ b/ydb/core/kqp/common/compilation/result.cpp
@@ -1,5 +1,4 @@
#include "result.h"
-#include <ydb/core/kqp/query_data/kqp_prepared_query.h>
namespace NKikimr::NKqp {
diff --git a/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt
index 2e48fa7671d..219ea9df7fb 100644
--- a/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt
@@ -16,8 +16,16 @@ target_link_libraries(kqp-common-events PUBLIC
yutil
ydb-core-protos
ydb-core-base
+ core-grpc_services-base
+ core-grpc_services-cancelation
+ kqp-common-shutdown
+ kqp-common-compilation
+ yql-dq-actors
+ api-protos
+ cpp-actors-core
)
target_sources(kqp-common-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp
)
diff --git a/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt
index 1013ee8365b..f01f66089fe 100644
--- a/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt
@@ -17,8 +17,16 @@ target_link_libraries(kqp-common-events PUBLIC
yutil
ydb-core-protos
ydb-core-base
+ core-grpc_services-base
+ core-grpc_services-cancelation
+ kqp-common-shutdown
+ kqp-common-compilation
+ yql-dq-actors
+ api-protos
+ cpp-actors-core
)
target_sources(kqp-common-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp
)
diff --git a/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt
index 1013ee8365b..f01f66089fe 100644
--- a/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt
@@ -17,8 +17,16 @@ target_link_libraries(kqp-common-events PUBLIC
yutil
ydb-core-protos
ydb-core-base
+ core-grpc_services-base
+ core-grpc_services-cancelation
+ kqp-common-shutdown
+ kqp-common-compilation
+ yql-dq-actors
+ api-protos
+ cpp-actors-core
)
target_sources(kqp-common-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp
)
diff --git a/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt
index 2e48fa7671d..219ea9df7fb 100644
--- a/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt
@@ -16,8 +16,16 @@ target_link_libraries(kqp-common-events PUBLIC
yutil
ydb-core-protos
ydb-core-base
+ core-grpc_services-base
+ core-grpc_services-cancelation
+ kqp-common-shutdown
+ kqp-common-compilation
+ yql-dq-actors
+ api-protos
+ cpp-actors-core
)
target_sources(kqp-common-events PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp
)
diff --git a/ydb/core/kqp/common/events/events.cpp b/ydb/core/kqp/common/events/events.cpp
new file mode 100644
index 00000000000..4a55f64db54
--- /dev/null
+++ b/ydb/core/kqp/common/events/events.cpp
@@ -0,0 +1,5 @@
+#include "events.h"
+
+namespace NKikimr::NKqp {
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h
new file mode 100644
index 00000000000..9a0702f849b
--- /dev/null
+++ b/ydb/core/kqp/common/events/events.h
@@ -0,0 +1,124 @@
+#pragma once
+
+#include "process_response.h"
+#include "query.h"
+#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
+#include <ydb/core/protos/kqp.pb.h>
+#include <ydb/core/kqp/common/compilation/events.h>
+#include <ydb/core/kqp/common/shutdown/events.h>
+#include <ydb/public/api/protos/draft/ydb_query.pb.h>
+#include <ydb/library/yql/dq/actors/dq.h>
+
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/core/event_load.h>
+#include <contrib/libs/protobuf/src/google/protobuf/map.h>
+
+namespace NKikimr::NKqp {
+
+struct TEvKqp {
+ using TEvQueryRequestRemote = NPrivateEvents::TEvQueryRequestRemote;
+
+ using TEvProcessResponse = NPrivateEvents::TEvProcessResponse;
+
+ using TEvQueryRequest = NPrivateEvents::TEvQueryRequest;
+
+ struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest,
+ NKikimrKqp::TEvCloseSessionRequest, TKqpEvents::EvCloseSessionRequest> {};
+
+ struct TEvCreateSessionRequest : public TEventPB<TEvCreateSessionRequest,
+ NKikimrKqp::TEvCreateSessionRequest, TKqpEvents::EvCreateSessionRequest> {};
+
+ struct TEvPingSessionRequest : public TEventPB<TEvPingSessionRequest,
+ NKikimrKqp::TEvPingSessionRequest, TKqpEvents::EvPingSessionRequest> {};
+
+
+ using TEvCompileRequest = NPrivateEvents::TEvCompileRequest;
+ using TEvRecompileRequest = NPrivateEvents::TEvRecompileRequest;
+ using TEvCompileResponse = NPrivateEvents::TEvCompileResponse;
+ using TEvCompileInvalidateRequest = NPrivateEvents::TEvCompileInvalidateRequest;
+
+ using TEvInitiateSessionShutdown = NKikimr::NKqp::NPrivateEvents::TEvInitiateSessionShutdown;
+ using TEvContinueShutdown = NKikimr::NKqp::NPrivateEvents::TEvContinueShutdown;
+
+ using TEvDataQueryStreamPart = NPrivateEvents::TEvDataQueryStreamPart;
+
+ struct TEvDataQueryStreamPartAck : public TEventLocal<TEvDataQueryStreamPartAck, TKqpEvents::EvDataQueryStreamPartAck> {};
+
+ template <typename TProto>
+ using TProtoArenaHolder = NPrivateEvents::TProtoArenaHolder<TProto>;
+
+ using TEvQueryResponse = NPrivateEvents::TEvQueryResponse;
+
+ struct TEvCreateSessionResponse : public TEventPB<TEvCreateSessionResponse,
+ NKikimrKqp::TEvCreateSessionResponse, TKqpEvents::EvCreateSessionResponse> {};
+
+ struct TEvContinueProcess : public TEventLocal<TEvContinueProcess, TKqpEvents::EvContinueProcess> {
+ TEvContinueProcess(ui32 queryId, bool finished)
+ : QueryId(queryId)
+ , Finished(finished) {}
+
+ ui32 QueryId;
+ bool Finished;
+ };
+
+ using TEvQueryTimeout = NPrivateEvents::TEvQueryTimeout;
+
+ struct TEvIdleTimeout : public TEventLocal<TEvIdleTimeout, TKqpEvents::EvIdleTimeout> {
+ TEvIdleTimeout(ui32 timerId)
+ : TimerId(timerId) {}
+
+ ui32 TimerId;
+ };
+
+ struct TEvCloseSessionResponse : public TEventPB<TEvCloseSessionResponse,
+ NKikimrKqp::TEvCloseSessionResponse, TKqpEvents::EvCloseSessionResponse> {};
+
+ struct TEvPingSessionResponse : public TEventPB<TEvPingSessionResponse,
+ NKikimrKqp::TEvPingSessionResponse, TKqpEvents::EvPingSessionResponse> {};
+
+ struct TEvKqpProxyPublishRequest :
+ public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {};
+
+ using TEvInitiateShutdownRequest = NPrivateEvents::TEvInitiateShutdownRequest;
+
+ struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
+ TEvScriptRequest() = default;
+
+ mutable NKikimrKqp::TEvQueryRequest Record;
+ };
+
+ struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> {
+ TEvScriptResponse(TString operationId, TString executionId, Ydb::Query::ExecStatus execStatus, Ydb::Query::ExecMode execMode)
+ : Status(Ydb::StatusIds::SUCCESS)
+ , OperationId(std::move(operationId))
+ , ExecutionId(std::move(executionId))
+ , ExecStatus(execStatus)
+ , ExecMode(execMode)
+ {}
+
+ TEvScriptResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Status(status)
+ , Issues(std::move(issues))
+ , ExecStatus(Ydb::Query::EXEC_STATUS_FAILED)
+ , ExecMode(Ydb::Query::EXEC_MODE_UNSPECIFIED)
+ {}
+
+ const Ydb::StatusIds::StatusCode Status;
+ const NYql::TIssues Issues;
+ const TString OperationId;
+ const TString ExecutionId;
+ const Ydb::Query::ExecStatus ExecStatus;
+ const Ydb::Query::ExecMode ExecMode;
+ };
+
+ using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution;
+
+ struct TEvFetchScriptResultsRequest : public TEventPB<TEvFetchScriptResultsRequest, NKikimrKqp::TEvFetchScriptResultsRequest, TKqpEvents::EvFetchScriptResultsRequest> {
+ };
+
+ struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> {
+ };
+};
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/events/process_response.h b/ydb/core/kqp/common/events/process_response.h
index bdae89bb29d..3c0adea39ce 100644
--- a/ydb/core/kqp/common/events/process_response.h
+++ b/ydb/core/kqp/common/events/process_response.h
@@ -2,7 +2,7 @@
#include <ydb/core/protos/kqp.pb.h>
#include <library/cpp/actors/core/event_pb.h>
#include <util/generic/ptr.h>
-#include "kqp_event_ids.h"
+#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
namespace NKikimr::NKqp::NPrivateEvents {
diff --git a/ydb/core/kqp/common/events/query.cpp b/ydb/core/kqp/common/events/query.cpp
new file mode 100644
index 00000000000..b6724b22ae7
--- /dev/null
+++ b/ydb/core/kqp/common/events/query.cpp
@@ -0,0 +1,5 @@
+#include "query.h"
+
+namespace NKikimr::NKqp {
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
new file mode 100644
index 00000000000..c439cfffc06
--- /dev/null
+++ b/ydb/core/kqp/common/events/query.h
@@ -0,0 +1,388 @@
+#pragma once
+#include <ydb/core/protos/kqp.pb.h>
+#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
+#include <ydb/core/grpc_services/cancelation/cancelation.h>
+
+#include <ydb/public/api/protos/draft/ydb_query.pb.h>
+#include <ydb/library/aclib/aclib.h>
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/event_local.h>
+
+namespace NKikimr::NKqp::NPrivateEvents {
+
+struct TEvQueryRequestRemote: public TEventPB<TEvQueryRequestRemote, NKikimrKqp::TEvQueryRequest,
+ TKqpEvents::EvQueryRequest> {
+};
+
+struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
+public:
+ TEvQueryRequest(
+ NKikimrKqp::EQueryAction queryAction,
+ NKikimrKqp::EQueryType queryType,
+ TActorId requestActorId,
+ const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx,
+ const TString& sessionId,
+ TString&& yqlText,
+ TString&& queryId,
+ const ::Ydb::Table::TransactionControl* txControl,
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters,
+ const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
+ const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
+ const ::Ydb::Operations::OperationParams* operationParams,
+ bool keepSession = false);
+
+ TEvQueryRequest() = default;
+
+ bool IsSerializable() const override {
+ return true;
+ }
+
+ TEventSerializationInfo CreateSerializationInfo() const override { return {}; }
+
+ const TString& GetDatabase() const {
+ return RequestCtx ? Database : Record.GetRequest().GetDatabase();
+ }
+
+ bool HasYdbStatus() const {
+ return RequestCtx ? false : Record.HasYdbStatus();
+ }
+
+ const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
+ return Record.GetRequest().GetTopicOperations();
+ }
+
+ bool HasTopicOperations() const {
+ return Record.GetRequest().HasTopicOperations();
+ }
+
+ bool GetKeepSession() const {
+ return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession();
+ }
+
+ TDuration GetCancelAfter() const {
+ return RequestCtx ? CancelAfter : TDuration::MilliSeconds(Record.GetRequest().GetCancelAfterMs());
+ }
+
+ TDuration GetOperationTimeout() const {
+ return RequestCtx ? OperationTimeout : TDuration::MilliSeconds(Record.GetRequest().GetTimeoutMs());
+ }
+
+ bool HasAction() const {
+ return RequestCtx ? true : Record.GetRequest().HasAction();
+ }
+
+ void SetSessionId(const TString& sessionId) {
+ if (RequestCtx) {
+ SessionId = sessionId;
+ } else {
+ Record.MutableRequest()->SetSessionId(sessionId);
+ }
+ }
+
+ const TString& GetSessionId() const {
+ return RequestCtx ? SessionId : Record.GetRequest().GetSessionId();
+ }
+
+ NKikimrKqp::EQueryAction GetAction() const {
+ return RequestCtx ? QueryAction : Record.GetRequest().GetAction();
+ }
+
+ NKikimrKqp::EQueryType GetType() const {
+ return RequestCtx ? QueryType : Record.GetRequest().GetType();
+ }
+
+ bool HasPreparedQuery() const {
+ return RequestCtx ? QueryId.size() > 0 : Record.GetRequest().HasPreparedQuery();
+ }
+
+ const TString& GetPreparedQuery() const {
+ return RequestCtx ? QueryId : Record.GetRequest().GetPreparedQuery();
+ }
+
+ const TString& GetQuery() const {
+ return RequestCtx ? YqlText : Record.GetRequest().GetQuery();
+ }
+
+ const ::NKikimrMiniKQL::TParams& GetParameters() const {
+ return Record.GetRequest().GetParameters();
+ }
+
+ const ::Ydb::Table::TransactionControl& GetTxControl() const {
+ return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl();
+ }
+
+ bool GetUsePublicResponseDataFormat() const {
+ return RequestCtx ? true : Record.GetRequest().GetUsePublicResponseDataFormat();
+ }
+
+ bool GetQueryKeepInCache() const {
+ if (RequestCtx) {
+ if (QueryCachePolicy != nullptr) {
+ return QueryCachePolicy->keep_in_cache();
+ }
+ return false;
+ }
+ return Record.GetRequest().GetQueryCachePolicy().keep_in_cache();
+ }
+
+ bool HasTxControl() const {
+ return RequestCtx ? TxControl != nullptr : Record.GetRequest().HasTxControl();
+ }
+
+ bool HasCollectStats() const {
+ return RequestCtx ? true : Record.GetRequest().HasCollectStats();
+ }
+
+ TActorId GetRequestActorId() const {
+ return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId());
+ }
+
+ google::protobuf::Arena* GetArena() {
+ return RequestCtx ? RequestCtx->GetArena() : nullptr;
+ }
+
+ const TString& GetTraceId() const {
+ if (RequestCtx) {
+ if (!TraceId) {
+ TraceId = RequestCtx->GetTraceId().GetOrElse("");
+ }
+ return TraceId;
+ }
+
+ return Record.GetTraceId();
+ }
+
+ const TString& GetRequestType() const {
+ if (RequestCtx) {
+ if (!RequestType) {
+ RequestType = RequestCtx->GetRequestType().GetOrElse("");
+ }
+ return RequestType;
+ }
+
+ return Record.GetRequestType();
+ }
+
+ const TIntrusiveConstPtr<NACLib::TUserToken>& GetUserToken() const {
+ if (RequestCtx && RequestCtx->GetInternalToken()) {
+ return RequestCtx->GetInternalToken();
+ }
+
+ if (Token_) {
+ return Token_;
+ }
+
+ Token_ = new NACLib::TUserToken(Record.GetUserToken());
+ return Token_;
+ }
+
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
+ if (YdbParameters) {
+ return *YdbParameters;
+ }
+
+ return Record.GetRequest().GetYdbParameters();
+ }
+
+ Ydb::StatusIds::StatusCode GetYdbStatus() const {
+ return Record.GetYdbStatus();
+ }
+
+ ::Ydb::Table::QueryStatsCollection::Mode GetCollectStats() const {
+ if (RequestCtx) {
+ return CollectStats;
+ }
+
+ return Record.GetRequest().GetCollectStats();
+ }
+
+ const ::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& GetQueryIssues() const {
+ return Record.GetQueryIssues();
+ }
+
+ ui64 GetRequestSize() const {
+ return Record.GetRequest().ByteSizeLong();
+ }
+
+ ui64 GetQuerySize() const {
+ return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size();
+ }
+
+ bool IsInternalCall() const {
+ return RequestCtx ? RequestCtx->IsInternalCall() : Record.GetRequest().GetIsInternalCall();
+ }
+
+ ui64 GetParametersSize() const {
+ if (ParametersSize > 0) {
+ return ParametersSize;
+ }
+
+ ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong();
+ for (const auto& [name, param] : GetYdbParameters()) {
+ ParametersSize += name.size();
+ ParametersSize += param.ByteSizeLong();
+ }
+
+ return ParametersSize;
+ }
+
+ ui32 CalculateSerializedSize() const override {
+ PrepareRemote();
+ return Record.ByteSize();
+ }
+
+ bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override {
+ PrepareRemote();
+ return Record.SerializeToZeroCopyStream(chunker);
+ }
+
+ static NActors::IEventBase* Load(TEventSerializedData* data) {
+ auto pbEv = THolder<TEvQueryRequestRemote>(static_cast<TEvQueryRequestRemote*>(TEvQueryRequestRemote::Load(data)));
+ auto req = new TEvQueryRequest();
+ req->Record.Swap(&pbEv->Record);
+ return req;
+ }
+
+ void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) {
+ if (RequestCtx) {
+ RequestCtx->SetClientLostAction([actorId, as]() {
+ as->Send(actorId, new NGRpcService::TEvClientLost());
+ });
+ } else if (Record.HasCancelationActor()) {
+ auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor());
+ NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, as);
+ }
+ }
+
+ void PrepareRemote() const;
+
+ mutable NKikimrKqp::TEvQueryRequest Record;
+
+private:
+ mutable ui64 ParametersSize = 0;
+ mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx;
+ mutable TString TraceId;
+ mutable TString RequestType;
+ mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
+ TActorId RequestActorId;
+ TString Database;
+ TString SessionId;
+ TString YqlText;
+ TString QueryId;
+ NKikimrKqp::EQueryAction QueryAction;
+ NKikimrKqp::EQueryType QueryType;
+ const ::Ydb::Table::TransactionControl* TxControl = nullptr;
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr;
+ const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
+ const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr;
+ const ::Ydb::Operations::OperationParams* OperationParams = nullptr;
+ bool KeepSession = false;
+ TDuration OperationTimeout;
+ TDuration CancelAfter;
+};
+
+struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
+ NKikimrKqp::TEvDataQueryStreamPart, TKqpEvents::EvDataQueryStreamPart> {
+};
+
+struct TEvDataQueryStreamPartAck: public TEventLocal<TEvDataQueryStreamPartAck, TKqpEvents::EvDataQueryStreamPartAck> {};
+
+// Wrapper to use Arena allocated protobuf with ActorSystem (for serialization path).
+// Arena deserialization is not supported.
+// TODO: Add arena support to actor system TEventPB?
+template<typename TProto>
+class TProtoArenaHolder: public TNonCopyable {
+public:
+ TProtoArenaHolder()
+ : Protobuf_(google::protobuf::Arena::CreateMessage<TProto>(nullptr))
+ , NeedDelete_(true) {
+ }
+
+ ~TProtoArenaHolder() {
+ // Deallocate message only if it was "normal" allocation
+ // In case of protobuf arena memory will be freed during arena deallocation
+ if (NeedDelete_) {
+ delete Protobuf_;
+ }
+ }
+
+ void Realloc(std::shared_ptr<google::protobuf::Arena> arena) {
+ ReallocRef(arena.get());
+ Arena_ = arena;
+ }
+
+ void ReallocRef(google::protobuf::Arena* arena) {
+ // Allow realloc only if previous allocation was made using "normal" allocator
+ // and no data was writen. It prevents ineffective using of protobuf.
+ Y_ASSERT(!Protobuf_->GetArena());
+ Y_ASSERT(ByteSize() == 0);
+ delete Protobuf_;
+ Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena);
+ if (arena) {
+ NeedDelete_ = false;
+ }
+ }
+
+ bool ParseFromString(const TString& data) {
+ return Protobuf_->ParseFromString(data);
+ }
+
+ bool ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream* input) {
+ return Protobuf_->ParseFromZeroCopyStream(input);
+ }
+
+ bool SerializeToZeroCopyStream(google::protobuf::io::ZeroCopyOutputStream* output) const {
+ return Protobuf_->SerializeToZeroCopyStream(output);
+ }
+
+ bool SerializeToString(TString* output) const {
+ return Protobuf_->SerializeToString(output);
+ }
+
+ int ByteSize() const {
+ return Protobuf_->ByteSize();
+ }
+
+ TString DebugString() const {
+ return Protobuf_->DebugString();
+ }
+
+ TString ShortDebugString() const {
+ return Protobuf_->ShortDebugString();
+ }
+
+ TString GetTypeName() const {
+ return Protobuf_->GetTypeName();
+ }
+
+ const TProto& GetRef() const {
+ return *Protobuf_;
+ }
+
+ TProto& GetRef() {
+ return *Protobuf_;
+ }
+
+private:
+ TProtoArenaHolder(TProtoArenaHolder&&) = default;
+ TProtoArenaHolder& operator=(TProtoArenaHolder&&) = default;
+ TProto* Protobuf_;
+ std::shared_ptr<google::protobuf::Arena> Arena_;
+ bool NeedDelete_;
+};
+
+struct TEvQueryTimeout: public TEventLocal<TEvQueryTimeout, TKqpEvents::EvQueryTimeout> {
+ TEvQueryTimeout(ui32 queryId)
+ : QueryId(queryId) {
+ }
+
+ ui32 QueryId;
+};
+
+struct TEvQueryResponse: public TEventPB<TEvQueryResponse, TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>,
+ TKqpEvents::EvQueryResponse> {
+};
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/kqp.cpp b/ydb/core/kqp/common/kqp.cpp
index fe9f7d85663..7f3bdff3383 100644
--- a/ydb/core/kqp/common/kqp.cpp
+++ b/ydb/core/kqp/common/kqp.cpp
@@ -6,44 +6,4 @@
namespace NKikimr::NKqp {
-TKqpShutdownController::TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool enableGraceful)
- : KqpProxyActorId_(kqpProxyActorId)
- , EnableGraceful(enableGraceful)
- , TableServiceConfig(tableServiceConfig)
-{
- ShutdownState_.Reset(new TKqpShutdownState());
-}
-
-void TKqpShutdownController::Initialize(NActors::TActorSystem* actorSystem) {
- ActorSystem_ = actorSystem;
-}
-
-void TKqpShutdownController::Stop() {
- if (!EnableGraceful)
- return;
-
- ActorSystem_->Send(new NActors::IEventHandle(KqpProxyActorId_, {}, new TEvKqp::TEvInitiateShutdownRequest(ShutdownState_)));
- auto timeout = TDuration::MilliSeconds(TableServiceConfig.GetShutdownSettings().GetShutdownTimeoutMs());
- auto startedAt = TInstant::Now();
- auto spent = (TInstant::Now() - startedAt).SecondsFloat();
- ui32 iteration = 0;
- while (spent < timeout.SecondsFloat()) {
- if (iteration % 30 == 0) {
- Cerr << "Current KQP shutdown state: spent " << spent << " seconds, ";
- if (ShutdownState_->Initialized()) {
- Cerr << ShutdownState_->GetPendingSessions() << " sessions to shutdown" << Endl;
- } else {
- Cerr << "not started yet" << Endl;
- }
- }
-
- if (ShutdownState_->ShutdownComplete())
- break;
-
- Sleep(TDuration::MilliSeconds(100));
- ++iteration;
- spent = (TInstant::Now() - startedAt).SecondsFloat();
- }
-}
-
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 3a8013ca654..9894f5f0b64 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -4,8 +4,10 @@
#include "simple/helpers.h"
#include "simple/query_id.h"
#include "simple/settings.h"
-#include "events/process_response.h"
+#include "simple/services.h"
+#include "events/events.h"
#include "compilation/result.h"
+#include "shutdown/controller.h"
#include <library/cpp/lwtrace/shuttle.h>
@@ -22,8 +24,7 @@
#include <util/generic/guid.h>
#include <util/generic/ptr.h>
-namespace NKikimr {
-namespace NKqp {
+namespace NKikimr::NKqp {
void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);
@@ -35,631 +36,6 @@ inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
}
}
-const TStringBuf DefaultKikimrPublicClusterName = "db";
-
-inline NActors::TActorId MakeKqpProxyID(ui32 nodeId) {
- const char name[12] = "kqp_proxy";
- return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
-inline NActors::TActorId MakeKqpCompileServiceID(ui32 nodeId) {
- const char name[12] = "kqp_compile";
- return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
-inline NActors::TActorId MakeKqpResourceManagerServiceID(ui32 nodeId) {
- const char name[12] = "kqp_resman";
- return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
-inline NActors::TActorId MakeKqpRmServiceID(ui32 nodeId) {
- const char name[12] = "kqp_rm";
- return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
-inline NActors::TActorId MakeKqpNodeServiceID(ui32 nodeId) {
- const char name[12] = "kqp_node";
- return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
-inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) {
- const char name[12] = "kqp_lfspill";
- return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
-class TKqpShutdownController;
-
-class TKqpShutdownState : public TThrRefBase {
- friend class TKqpShutdownController;
-
-public:
- void Update(ui32 pendingSessions) {
- AtomicSet(PendingSessions_, pendingSessions);
-
- if (!Initialized()) {
- AtomicSet(Initialized_, 1);
- }
-
- if (!pendingSessions) {
- SetCompleted();
- }
- }
-private:
- bool ShutdownComplete() const {
- return AtomicGet(ShutdownComplete_) == 1;
- }
-
- ui32 GetPendingSessions() const {
- return AtomicGet(PendingSessions_);
- }
-
- bool Initialized() const {
- return AtomicGet(Initialized_) == 1;
- }
-
- void SetCompleted() {
- AtomicSet(ShutdownComplete_, 1);
- }
-
- TAtomic PendingSessions_ = 0;
- TAtomic Initialized_ = 0;
- TAtomic ShutdownComplete_ = 0;
-};
-
-
-class TKqpShutdownController {
-public:
- TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool gracefulEnabled);
- ~TKqpShutdownController() = default;
-
- void Initialize(NActors::TActorSystem* actorSystem);
- void Stop();
-
-private:
- NActors::TActorId KqpProxyActorId_;
- NActors::TActorSystem* ActorSystem_;
- bool EnableGraceful;
- NKikimrConfig::TTableServiceConfig TableServiceConfig;
- TIntrusivePtr<TKqpShutdownState> ShutdownState_;
-};
-
-struct TEvKqp {
- struct TEvQueryRequestRemote : public TEventPB<TEvQueryRequestRemote, NKikimrKqp::TEvQueryRequest,
- TKqpEvents::EvQueryRequest> {};
-
- using TEvProcessResponse = NPrivateEvents::TEvProcessResponse;
-
- struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
- public:
- TEvQueryRequest(
- NKikimrKqp::EQueryAction queryAction,
- NKikimrKqp::EQueryType queryType,
- TActorId requestActorId,
- const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx,
- const TString& sessionId,
- TString&& yqlText,
- TString&& queryId,
- const ::Ydb::Table::TransactionControl* txControl,
- const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters,
- const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
- const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
- const ::Ydb::Operations::OperationParams* operationParams,
- bool keepSession = false);
-
- TEvQueryRequest() = default;
-
- bool IsSerializable() const override {
- return true;
- }
-
- TEventSerializationInfo CreateSerializationInfo() const override { return {}; }
-
- const TString& GetDatabase() const {
- return RequestCtx ? Database : Record.GetRequest().GetDatabase();
- }
-
- bool HasYdbStatus() const {
- return RequestCtx ? false : Record.HasYdbStatus();
- }
-
- const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
- return Record.GetRequest().GetTopicOperations();
- }
-
- bool HasTopicOperations() const {
- return Record.GetRequest().HasTopicOperations();
- }
-
- bool GetKeepSession() const {
- return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession();
- }
-
- TDuration GetCancelAfter() const {
- return RequestCtx ? CancelAfter : TDuration::MilliSeconds(Record.GetRequest().GetCancelAfterMs());
- }
-
- TDuration GetOperationTimeout() const {
- return RequestCtx ? OperationTimeout : TDuration::MilliSeconds(Record.GetRequest().GetTimeoutMs());
- }
-
- bool HasAction() const {
- return RequestCtx ? true : Record.GetRequest().HasAction();
- }
-
- void SetSessionId(const TString& sessionId) {
- if (RequestCtx) {
- SessionId = sessionId;
- } else {
- Record.MutableRequest()->SetSessionId(sessionId);
- }
- }
-
- const TString& GetSessionId() const {
- return RequestCtx ? SessionId : Record.GetRequest().GetSessionId();
- }
-
- NKikimrKqp::EQueryAction GetAction() const {
- return RequestCtx ? QueryAction : Record.GetRequest().GetAction();
- }
-
- NKikimrKqp::EQueryType GetType() const {
- return RequestCtx ? QueryType : Record.GetRequest().GetType();
- }
-
- bool HasPreparedQuery() const {
- return RequestCtx ? QueryId.size() > 0 : Record.GetRequest().HasPreparedQuery();
- }
-
- const TString& GetPreparedQuery() const {
- return RequestCtx ? QueryId : Record.GetRequest().GetPreparedQuery();
- }
-
- const TString& GetQuery() const {
- return RequestCtx ? YqlText : Record.GetRequest().GetQuery();
- }
-
- const ::NKikimrMiniKQL::TParams& GetParameters() const {
- return Record.GetRequest().GetParameters();
- }
-
- const ::Ydb::Table::TransactionControl& GetTxControl() const {
- return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl();
- }
-
- bool GetUsePublicResponseDataFormat() const {
- return RequestCtx ? true : Record.GetRequest().GetUsePublicResponseDataFormat();
- }
-
- bool GetQueryKeepInCache() const {
- if (RequestCtx) {
- if (QueryCachePolicy != nullptr) {
- return QueryCachePolicy->keep_in_cache();
- }
- return false;
- }
- return Record.GetRequest().GetQueryCachePolicy().keep_in_cache();
- }
-
- bool HasTxControl() const {
- return RequestCtx ? TxControl != nullptr : Record.GetRequest().HasTxControl();
- }
-
- bool HasCollectStats() const {
- return RequestCtx ? true : Record.GetRequest().HasCollectStats();
- }
-
- TActorId GetRequestActorId() const {
- return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId());
- }
-
- google::protobuf::Arena* GetArena() {
- return RequestCtx ? RequestCtx->GetArena() : nullptr;
- }
-
- const TString& GetTraceId() const {
- if (RequestCtx) {
- if (!TraceId) {
- TraceId = RequestCtx->GetTraceId().GetOrElse("");
- }
- return TraceId;
- }
-
- return Record.GetTraceId();
- }
-
- const TString& GetRequestType() const {
- if (RequestCtx) {
- if (!RequestType) {
- RequestType = RequestCtx->GetRequestType().GetOrElse("");
- }
- return RequestType;
- }
-
- return Record.GetRequestType();
- }
-
- const TIntrusiveConstPtr<NACLib::TUserToken>& GetUserToken() const {
- if (RequestCtx && RequestCtx->GetInternalToken()) {
- return RequestCtx->GetInternalToken();
- }
-
- if (Token_) {
- return Token_;
- }
-
- Token_ = new NACLib::TUserToken(Record.GetUserToken());
- return Token_;
- }
-
- const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
- if (YdbParameters) {
- return *YdbParameters;
- }
-
- return Record.GetRequest().GetYdbParameters();
- }
-
- Ydb::StatusIds::StatusCode GetYdbStatus() const {
- return Record.GetYdbStatus();
- }
-
- ::Ydb::Table::QueryStatsCollection::Mode GetCollectStats() const {
- if (RequestCtx) {
- return CollectStats;
- }
-
- return Record.GetRequest().GetCollectStats();
- }
-
- const ::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& GetQueryIssues() const {
- return Record.GetQueryIssues();
- }
-
- ui64 GetRequestSize() const {
- return Record.GetRequest().ByteSizeLong();
- }
-
- ui64 GetQuerySize() const {
- return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size();
- }
-
- bool IsInternalCall() const {
- return RequestCtx ? RequestCtx->IsInternalCall() : Record.GetRequest().GetIsInternalCall();
- }
-
- ui64 GetParametersSize() const {
- if (ParametersSize > 0) {
- return ParametersSize;
- }
-
- ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong();
- for(const auto& [name, param]: GetYdbParameters()) {
- ParametersSize += name.size();
- ParametersSize += param.ByteSizeLong();
- }
-
- return ParametersSize;
- }
-
- ui32 CalculateSerializedSize() const override {
- PrepareRemote();
- return Record.ByteSize();
- }
-
- bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override {
- PrepareRemote();
- return Record.SerializeToZeroCopyStream(chunker);
- }
-
- static NActors::IEventBase* Load(TEventSerializedData* data) {
- auto pbEv = THolder<TEvQueryRequestRemote>(static_cast<TEvQueryRequestRemote*>(TEvQueryRequestRemote::Load(data)));
- auto req = new TEvQueryRequest();
- req->Record.Swap(&pbEv->Record);
- return req;
- }
-
- void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) {
- if (RequestCtx) {
- RequestCtx->SetClientLostAction([actorId, as]() {
- as->Send(actorId, new NGRpcService::TEvClientLost());
- });
- } else if (Record.HasCancelationActor()) {
- auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor());
- NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, as);
- }
- }
-
- void PrepareRemote() const;
-
- mutable NKikimrKqp::TEvQueryRequest Record;
-
- private:
- mutable ui64 ParametersSize = 0;
- mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx;
- mutable TString TraceId;
- mutable TString RequestType;
- mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
- TActorId RequestActorId;
- TString Database;
- TString SessionId;
- TString YqlText;
- TString QueryId;
- NKikimrKqp::EQueryAction QueryAction;
- NKikimrKqp::EQueryType QueryType;
- const ::Ydb::Table::TransactionControl* TxControl = nullptr;
- const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr;
- const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
- const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr;
- const ::Ydb::Operations::OperationParams* OperationParams = nullptr;
- bool KeepSession = false;
- TDuration OperationTimeout;
- TDuration CancelAfter;
- };
-
- struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest,
- NKikimrKqp::TEvCloseSessionRequest, TKqpEvents::EvCloseSessionRequest> {};
-
- struct TEvCreateSessionRequest : public TEventPB<TEvCreateSessionRequest,
- NKikimrKqp::TEvCreateSessionRequest, TKqpEvents::EvCreateSessionRequest> {};
-
- struct TEvPingSessionRequest : public TEventPB<TEvPingSessionRequest,
- NKikimrKqp::TEvPingSessionRequest, TKqpEvents::EvPingSessionRequest> {};
-
- struct TEvInitiateSessionShutdown : public TEventLocal<TEvInitiateSessionShutdown, TKqpEvents::EvInitiateSessionShutdown> {
- ui32 SoftTimeoutMs;
- ui32 HardTimeoutMs;
-
- TEvInitiateSessionShutdown(ui32 softTimeoutMs, ui32 hardTimeoutMs)
- : SoftTimeoutMs(softTimeoutMs)
- , HardTimeoutMs(hardTimeoutMs)
- {}
- };
-
- struct TEvContinueShutdown : public TEventLocal<TEvContinueShutdown, TKqpEvents::EvContinueShutdown> {};
-
- struct TEvDataQueryStreamPart : public TEventPB<TEvDataQueryStreamPart,
- NKikimrKqp::TEvDataQueryStreamPart, TKqpEvents::EvDataQueryStreamPart> {};
-
- struct TEvDataQueryStreamPartAck : public TEventLocal<TEvDataQueryStreamPartAck, TKqpEvents::EvDataQueryStreamPartAck> {};
-
- // Wrapper to use Arena allocated protobuf with ActorSystem (for serialization path).
- // Arena deserialization is not supported.
- // TODO: Add arena support to actor system TEventPB?
- template<typename TProto>
- class TProtoArenaHolder : public TNonCopyable {
- public:
- TProtoArenaHolder()
- : Protobuf_(google::protobuf::Arena::CreateMessage<TProto>(nullptr))
- , NeedDelete_(true)
- {}
-
- ~TProtoArenaHolder() {
- // Deallocate message only if it was "normal" allocation
- // In case of protobuf arena memory will be freed during arena deallocation
- if (NeedDelete_) {
- delete Protobuf_;
- }
- }
-
- void Realloc(std::shared_ptr<google::protobuf::Arena> arena) {
- ReallocRef(arena.get());
- Arena_ = arena;
- }
-
- void ReallocRef(google::protobuf::Arena* arena) {
- // Allow realloc only if previous allocation was made using "normal" allocator
- // and no data was writen. It prevents ineffective using of protobuf.
- Y_ASSERT(!Protobuf_->GetArena());
- Y_ASSERT(ByteSize() == 0);
- delete Protobuf_;
- Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena);
- if (arena) {
- NeedDelete_ = false;
- }
- }
-
- bool ParseFromString(const TString& data) {
- return Protobuf_->ParseFromString(data);
- }
-
- bool ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream* input) {
- return Protobuf_->ParseFromZeroCopyStream(input);
- }
-
- bool SerializeToZeroCopyStream(google::protobuf::io::ZeroCopyOutputStream* output) const {
- return Protobuf_->SerializeToZeroCopyStream(output);
- }
-
- bool SerializeToString(TString* output) const {
- return Protobuf_->SerializeToString(output);
- }
-
- int ByteSize() const {
- return Protobuf_->ByteSize();
- }
-
- TString DebugString() const {
- return Protobuf_->DebugString();
- }
-
- TString ShortDebugString() const {
- return Protobuf_->ShortDebugString();
- }
-
- TString GetTypeName() const {
- return Protobuf_->GetTypeName();
- }
-
- const TProto& GetRef() const {
- return *Protobuf_;
- }
-
- TProto& GetRef() {
- return *Protobuf_;
- }
-
- private:
- TProtoArenaHolder(TProtoArenaHolder&&) = default;
- TProtoArenaHolder& operator=(TProtoArenaHolder&&) = default;
- TProto* Protobuf_;
- std::shared_ptr<google::protobuf::Arena> Arena_;
- bool NeedDelete_;
- };
-
- struct TEvQueryResponse : public TEventPB<TEvQueryResponse, TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>,
- TKqpEvents::EvQueryResponse> {};
-
- struct TEvCreateSessionResponse : public TEventPB<TEvCreateSessionResponse,
- NKikimrKqp::TEvCreateSessionResponse, TKqpEvents::EvCreateSessionResponse> {};
-
- struct TEvContinueProcess : public TEventLocal<TEvContinueProcess, TKqpEvents::EvContinueProcess> {
- TEvContinueProcess(ui32 queryId, bool finished)
- : QueryId(queryId)
- , Finished(finished) {}
-
- ui32 QueryId;
- bool Finished;
- };
-
- struct TEvQueryTimeout : public TEventLocal<TEvQueryTimeout, TKqpEvents::EvQueryTimeout> {
- TEvQueryTimeout(ui32 queryId)
- : QueryId(queryId) {}
-
- ui32 QueryId;
- };
-
- struct TEvIdleTimeout : public TEventLocal<TEvIdleTimeout, TKqpEvents::EvIdleTimeout> {
- TEvIdleTimeout(ui32 timerId)
- : TimerId(timerId) {}
-
- ui32 TimerId;
- };
-
- struct TEvCloseSessionResponse : public TEventPB<TEvCloseSessionResponse,
- NKikimrKqp::TEvCloseSessionResponse, TKqpEvents::EvCloseSessionResponse> {};
-
- struct TEvPingSessionResponse : public TEventPB<TEvPingSessionResponse,
- NKikimrKqp::TEvPingSessionResponse, TKqpEvents::EvPingSessionResponse> {};
-
- struct TEvCompileRequest : public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
- TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query,
- bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
- : UserToken(userToken)
- , Uid(uid)
- , Query(std::move(query))
- , KeepInCache(keepInCache)
- , Deadline(deadline)
- , DbCounters(dbCounters)
- , Orbit(std::move(orbit))
- {
- Y_ENSURE(Uid.Defined() != Query.Defined());
- }
-
- TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
- TMaybe<TString> Uid;
- TMaybe<TKqpQueryId> Query;
- bool KeepInCache = false;
- // it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
- TInstant Deadline;
- TKqpDbCountersPtr DbCounters;
- TMaybe<bool> DocumentApiRestricted;
-
- NLWTrace::TOrbit Orbit;
- };
-
- struct TEvRecompileRequest : public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
- TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query,
- TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {})
- : UserToken(userToken)
- , Uid(uid)
- , Query(query)
- , Deadline(deadline)
- , DbCounters(dbCounters)
- , Orbit(std::move(orbit)) {}
-
- TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
- TString Uid;
- TMaybe<TKqpQueryId> Query;
-
- TInstant Deadline;
- TKqpDbCountersPtr DbCounters;
-
- NLWTrace::TOrbit Orbit;
- };
-
- struct TEvCompileResponse : public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
- TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {})
- : CompileResult(compileResult)
- , Orbit(std::move(orbit)) {}
-
- TKqpCompileResult::TConstPtr CompileResult;
- NKqpProto::TKqpStatsCompile Stats;
- std::optional<TString> ReplayMessage;
-
- NLWTrace::TOrbit Orbit;
- };
-
- struct TEvKqpProxyPublishRequest :
- public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {};
-
- struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest,
- TKqpEvents::EvCompileInvalidateRequest>
- {
- TEvCompileInvalidateRequest(const TString& uid, TKqpDbCountersPtr dbCounters)
- : Uid(uid)
- , DbCounters(dbCounters) {}
-
- TString Uid;
- TKqpDbCountersPtr DbCounters;
- };
-
- struct TEvInitiateShutdownRequest : public TEventLocal<TEvInitiateShutdownRequest, TKqpEvents::EvInitiateShutdownRequest> {
- TIntrusivePtr<TKqpShutdownState> ShutdownState;
-
- TEvInitiateShutdownRequest(TIntrusivePtr<TKqpShutdownState> ShutdownState)
- : ShutdownState(ShutdownState)
- {}
- };
-
- struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
- TEvScriptRequest() = default;
-
- mutable NKikimrKqp::TEvQueryRequest Record;
- };
-
- struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> {
- TEvScriptResponse(TString operationId, TString executionId, Ydb::Query::ExecStatus execStatus, Ydb::Query::ExecMode execMode)
- : Status(Ydb::StatusIds::SUCCESS)
- , OperationId(std::move(operationId))
- , ExecutionId(std::move(executionId))
- , ExecStatus(execStatus)
- , ExecMode(execMode)
- {}
-
- TEvScriptResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
- : Status(status)
- , Issues(std::move(issues))
- , ExecStatus(Ydb::Query::EXEC_STATUS_FAILED)
- , ExecMode(Ydb::Query::EXEC_MODE_UNSPECIFIED)
- {}
-
- const Ydb::StatusIds::StatusCode Status;
- const NYql::TIssues Issues;
- const TString OperationId;
- const TString ExecutionId;
- const Ydb::Query::ExecStatus ExecStatus;
- const Ydb::Query::ExecMode ExecMode;
- };
-
- using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution;
-
- struct TEvFetchScriptResultsRequest : public TEventPB<TEvFetchScriptResultsRequest, NKikimrKqp::TEvFetchScriptResultsRequest, TKqpEvents::EvFetchScriptResultsRequest> {
- };
-
- struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> {
- };
-};
-
class TKqpRequestInfo {
public:
TKqpRequestInfo(const TString& traceId, const TString& sessionId)
@@ -743,5 +119,4 @@ static inline IOutputStream& operator<<(IOutputStream& stream, const TKqpRequest
return stream;
}
-} // namespace NKqp
-} // namespace NKikimr
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/kqp_event_ids.h b/ydb/core/kqp/common/kqp_event_ids.h
index 491443c69f8..24ada529f59 100644
--- a/ydb/core/kqp/common/kqp_event_ids.h
+++ b/ydb/core/kqp/common/kqp_event_ids.h
@@ -1,3 +1,3 @@
#pragma once
-#include "events/kqp_event_ids.h"
+#include "simple/kqp_event_ids.h"
diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..11cb4459e7d
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(kqp-common-shutdown)
+target_compile_options(kqp-common-shutdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(kqp-common-shutdown PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+)
+target_sources(kqp-common-shutdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp
+)
diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..8996ef48176
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(kqp-common-shutdown)
+target_compile_options(kqp-common-shutdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(kqp-common-shutdown PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+)
+target_sources(kqp-common-shutdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp
+)
diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..8996ef48176
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(kqp-common-shutdown)
+target_compile_options(kqp-common-shutdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(kqp-common-shutdown PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+)
+target_sources(kqp-common-shutdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp
+)
diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.txt b/ydb/core/kqp/common/shutdown/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..11cb4459e7d
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(kqp-common-shutdown)
+target_compile_options(kqp-common-shutdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(kqp-common-shutdown PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+)
+target_sources(kqp-common-shutdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp
+)
diff --git a/ydb/core/kqp/common/shutdown/controller.cpp b/ydb/core/kqp/common/shutdown/controller.cpp
new file mode 100644
index 00000000000..a6c1bd609e2
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/controller.cpp
@@ -0,0 +1,45 @@
+#include "controller.h"
+#include "events.h"
+
+namespace NKikimr::NKqp {
+
+TKqpShutdownController::TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool enableGraceful)
+ : KqpProxyActorId_(kqpProxyActorId)
+ , EnableGraceful(enableGraceful)
+ , TableServiceConfig(tableServiceConfig) {
+ ShutdownState_.Reset(new TKqpShutdownState());
+}
+
+void TKqpShutdownController::Initialize(NActors::TActorSystem* actorSystem) {
+ ActorSystem_ = actorSystem;
+}
+
+void TKqpShutdownController::Stop() {
+ if (!EnableGraceful)
+ return;
+
+ ActorSystem_->Send(new NActors::IEventHandle(KqpProxyActorId_, {}, new NPrivateEvents::TEvInitiateShutdownRequest(ShutdownState_)));
+ auto timeout = TDuration::MilliSeconds(TableServiceConfig.GetShutdownSettings().GetShutdownTimeoutMs());
+ auto startedAt = TInstant::Now();
+ auto spent = (TInstant::Now() - startedAt).SecondsFloat();
+ ui32 iteration = 0;
+ while (spent < timeout.SecondsFloat()) {
+ if (iteration % 30 == 0) {
+ Cerr << "Current KQP shutdown state: spent " << spent << " seconds, ";
+ if (ShutdownState_->Initialized()) {
+ Cerr << ShutdownState_->GetPendingSessions() << " sessions to shutdown" << Endl;
+ } else {
+ Cerr << "not started yet" << Endl;
+ }
+ }
+
+ if (ShutdownState_->ShutdownComplete())
+ break;
+
+ Sleep(TDuration::MilliSeconds(100));
+ ++iteration;
+ spent = (TInstant::Now() - startedAt).SecondsFloat();
+ }
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/shutdown/controller.h b/ydb/core/kqp/common/shutdown/controller.h
new file mode 100644
index 00000000000..91e7ac5519c
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/controller.h
@@ -0,0 +1,26 @@
+#pragma once
+#include "state.h"
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <ydb/core/protos/config.pb.h>
+
+namespace NKikimr::NKqp {
+
+class TKqpShutdownController {
+public:
+ TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool gracefulEnabled);
+ ~TKqpShutdownController() = default;
+
+ void Initialize(NActors::TActorSystem* actorSystem);
+ void Stop();
+
+private:
+ NActors::TActorId KqpProxyActorId_;
+ NActors::TActorSystem* ActorSystem_;
+ bool EnableGraceful;
+ NKikimrConfig::TTableServiceConfig TableServiceConfig;
+ TIntrusivePtr<TKqpShutdownState> ShutdownState_;
+};
+
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/shutdown/events.cpp b/ydb/core/kqp/common/shutdown/events.cpp
new file mode 100644
index 00000000000..4a55f64db54
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/events.cpp
@@ -0,0 +1,5 @@
+#include "events.h"
+
+namespace NKikimr::NKqp {
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/shutdown/events.h b/ydb/core/kqp/common/shutdown/events.h
new file mode 100644
index 00000000000..46371901723
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/events.h
@@ -0,0 +1,28 @@
+#pragma once
+#include "state.h"
+#include <library/cpp/actors/core/event_local.h>
+#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
+
+namespace NKikimr::NKqp::NPrivateEvents {
+
+struct TEvInitiateSessionShutdown: public TEventLocal<TEvInitiateSessionShutdown, TKqpEvents::EvInitiateSessionShutdown> {
+ ui32 SoftTimeoutMs;
+ ui32 HardTimeoutMs;
+
+ TEvInitiateSessionShutdown(ui32 softTimeoutMs, ui32 hardTimeoutMs)
+ : SoftTimeoutMs(softTimeoutMs)
+ , HardTimeoutMs(hardTimeoutMs) {
+ }
+};
+
+struct TEvContinueShutdown: public TEventLocal<TEvContinueShutdown, TKqpEvents::EvContinueShutdown> {};
+
+struct TEvInitiateShutdownRequest: public TEventLocal<TEvInitiateShutdownRequest, TKqpEvents::EvInitiateShutdownRequest> {
+ TIntrusivePtr<TKqpShutdownState> ShutdownState;
+
+ TEvInitiateShutdownRequest(TIntrusivePtr<TKqpShutdownState> ShutdownState)
+ : ShutdownState(ShutdownState) {
+ }
+};
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/shutdown/state.cpp b/ydb/core/kqp/common/shutdown/state.cpp
new file mode 100644
index 00000000000..82a66253b74
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/state.cpp
@@ -0,0 +1,17 @@
+#include "state.h"
+
+namespace NKikimr::NKqp {
+
+void TKqpShutdownState::Update(ui32 pendingSessions) {
+ AtomicSet(PendingSessions_, pendingSessions);
+
+ if (!Initialized()) {
+ AtomicSet(Initialized_, 1);
+ }
+
+ if (!pendingSessions) {
+ SetCompleted();
+ }
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/shutdown/state.h b/ydb/core/kqp/common/shutdown/state.h
new file mode 100644
index 00000000000..515a6999c20
--- /dev/null
+++ b/ydb/core/kqp/common/shutdown/state.h
@@ -0,0 +1,36 @@
+#pragma once
+#include <util/generic/ptr.h>
+#include <library/cpp/deprecated/atomic/atomic.h>
+
+namespace NKikimr::NKqp {
+
+class TKqpShutdownController;
+
+class TKqpShutdownState: public TThrRefBase {
+ friend class TKqpShutdownController;
+
+public:
+ void Update(ui32 pendingSessions);
+private:
+ bool ShutdownComplete() const {
+ return AtomicGet(ShutdownComplete_) == 1;
+ }
+
+ ui32 GetPendingSessions() const {
+ return AtomicGet(PendingSessions_);
+ }
+
+ bool Initialized() const {
+ return AtomicGet(Initialized_) == 1;
+ }
+
+ void SetCompleted() {
+ AtomicSet(ShutdownComplete_, 1);
+ }
+
+ TAtomic PendingSessions_ = 0;
+ TAtomic Initialized_ = 0;
+ TAtomic ShutdownComplete_ = 0;
+};
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt
index 28c9e36787d..6833c55e76a 100644
--- a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt
@@ -12,9 +12,13 @@ target_link_libraries(kqp-common-simple PUBLIC
contrib-libs-cxxsupp
yutil
ydb-core-protos
+ yql-dq-actors
+ ydb-core-base
)
target_sources(kqp-common-simple PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp
)
diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt
index 0c8ad81844e..f9217daae45 100644
--- a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt
@@ -13,9 +13,13 @@ target_link_libraries(kqp-common-simple PUBLIC
contrib-libs-cxxsupp
yutil
ydb-core-protos
+ yql-dq-actors
+ ydb-core-base
)
target_sources(kqp-common-simple PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp
)
diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt
index 0c8ad81844e..f9217daae45 100644
--- a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt
@@ -13,9 +13,13 @@ target_link_libraries(kqp-common-simple PUBLIC
contrib-libs-cxxsupp
yutil
ydb-core-protos
+ yql-dq-actors
+ ydb-core-base
)
target_sources(kqp-common-simple PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp
)
diff --git a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt
index 28c9e36787d..6833c55e76a 100644
--- a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt
@@ -12,9 +12,13 @@ target_link_libraries(kqp-common-simple PUBLIC
contrib-libs-cxxsupp
yutil
ydb-core-protos
+ yql-dq-actors
+ ydb-core-base
)
target_sources(kqp-common-simple PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp
)
diff --git a/ydb/core/kqp/common/events/kqp_event_ids.cpp b/ydb/core/kqp/common/simple/kqp_event_ids.cpp
index 593199fb384..593199fb384 100644
--- a/ydb/core/kqp/common/events/kqp_event_ids.cpp
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.cpp
diff --git a/ydb/core/kqp/common/events/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index 9723277a5cb..9723277a5cb 100644
--- a/ydb/core/kqp/common/events/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
diff --git a/ydb/core/kqp/common/simple/services.cpp b/ydb/core/kqp/common/simple/services.cpp
new file mode 100644
index 00000000000..2a7a17627d4
--- /dev/null
+++ b/ydb/core/kqp/common/simple/services.cpp
@@ -0,0 +1,5 @@
+#include "services.h"
+
+namespace NKikimr::NKqp {
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h
new file mode 100644
index 00000000000..3d598c95d67
--- /dev/null
+++ b/ydb/core/kqp/common/simple/services.h
@@ -0,0 +1,39 @@
+#pragma once
+#include <util/generic/strbuf.h>
+#include <library/cpp/actors/core/actorid.h>
+
+namespace NKikimr::NKqp {
+
+const TStringBuf DefaultKikimrPublicClusterName = "db";
+
+inline NActors::TActorId MakeKqpProxyID(ui32 nodeId) {
+ const char name[12] = "kqp_proxy";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
+inline NActors::TActorId MakeKqpCompileServiceID(ui32 nodeId) {
+ const char name[12] = "kqp_compile";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
+inline NActors::TActorId MakeKqpResourceManagerServiceID(ui32 nodeId) {
+ const char name[12] = "kqp_resman";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
+inline NActors::TActorId MakeKqpRmServiceID(ui32 nodeId) {
+ const char name[12] = "kqp_rm";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
+inline NActors::TActorId MakeKqpNodeServiceID(ui32 nodeId) {
+ const char name[12] = "kqp_node";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
+inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) {
+ const char name[12] = "kqp_lfspill";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
+} // namespace NKikimr::NKqp