diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-04 16:14:13 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-04 16:14:13 +0300 |
commit | 6b1bed39e996ecde5d9637b97612d7c4f36467f3 (patch) | |
tree | 57f784dbc9386690f4811df668a6fd0f81e53e05 | |
parent | 3b3c4be949271feb82f21388e31fb56cb7666062 (diff) | |
download | ydb-6b1bed39e996ecde5d9637b97612d7c4f36467f3.tar.gz |
split kqp common
42 files changed, 996 insertions, 677 deletions
diff --git a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt index 35587b33802..c40b6b7675a 100644 --- a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(compilation) add_subdirectory(events) +add_subdirectory(shutdown) add_subdirectory(simple) get_built_tool_path( TOOL_enum_parser_bin @@ -35,6 +36,7 @@ target_link_libraries(core-kqp-common PUBLIC kqp-common-simple kqp-common-compilation kqp-common-events + kqp-common-shutdown core-kqp-provider tx-long_tx_service-public core-tx-sharding diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt index a680d797473..c469afdc8b0 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt @@ -8,6 +8,7 @@ add_subdirectory(compilation) add_subdirectory(events) +add_subdirectory(shutdown) add_subdirectory(simple) get_built_tool_path( TOOL_enum_parser_bin @@ -36,6 +37,7 @@ target_link_libraries(core-kqp-common PUBLIC kqp-common-simple kqp-common-compilation kqp-common-events + kqp-common-shutdown core-kqp-provider tx-long_tx_service-public core-tx-sharding diff --git a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt index a680d797473..c469afdc8b0 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(compilation) add_subdirectory(events) +add_subdirectory(shutdown) add_subdirectory(simple) get_built_tool_path( TOOL_enum_parser_bin @@ -36,6 +37,7 @@ target_link_libraries(core-kqp-common PUBLIC kqp-common-simple kqp-common-compilation kqp-common-events + kqp-common-shutdown core-kqp-provider tx-long_tx_service-public core-tx-sharding diff --git a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt index 35587b33802..c40b6b7675a 100644 --- a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(compilation) add_subdirectory(events) +add_subdirectory(shutdown) add_subdirectory(simple) get_built_tool_path( TOOL_enum_parser_bin @@ -35,6 +36,7 @@ target_link_libraries(core-kqp-common PUBLIC kqp-common-simple kqp-common-compilation kqp-common-events + kqp-common-shutdown core-kqp-provider tx-long_tx_service-public core-tx-sharding diff --git a/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt index 23acfee25cc..f959492fd80 100644 --- a/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/compilation/CMakeLists.darwin-x86_64.txt @@ -20,4 +20,5 @@ target_link_libraries(kqp-common-compilation PUBLIC ) target_sources(kqp-common-compilation PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp ) diff --git a/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt index 905a8b9a14c..8145ada4823 100644 --- a/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/compilation/CMakeLists.linux-aarch64.txt @@ -21,4 +21,5 @@ target_link_libraries(kqp-common-compilation PUBLIC ) target_sources(kqp-common-compilation PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp ) diff --git a/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt index 905a8b9a14c..8145ada4823 100644 --- a/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/compilation/CMakeLists.linux-x86_64.txt @@ -21,4 +21,5 @@ target_link_libraries(kqp-common-compilation PUBLIC ) target_sources(kqp-common-compilation PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp ) diff --git a/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt index 23acfee25cc..f959492fd80 100644 --- a/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/compilation/CMakeLists.windows-x86_64.txt @@ -20,4 +20,5 @@ target_link_libraries(kqp-common-compilation PUBLIC ) target_sources(kqp-common-compilation PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/result.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/compilation/events.cpp ) diff --git a/ydb/core/kqp/common/compilation/events.cpp b/ydb/core/kqp/common/compilation/events.cpp new file mode 100644 index 00000000000..4a55f64db54 --- /dev/null +++ b/ydb/core/kqp/common/compilation/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NKqp { + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h new file mode 100644 index 00000000000..c9678caecb1 --- /dev/null +++ b/ydb/core/kqp/common/compilation/events.h @@ -0,0 +1,82 @@ +#pragma once +#include "result.h" + +#include <library/cpp/actors/core/event_local.h> +#include <ydb/library/aclib/aclib.h> +#include <ydb/core/kqp/common/simple/kqp_event_ids.h> +#include <ydb/core/kqp/common/simple/query_id.h> +#include <ydb/core/kqp/counters/kqp_counters.h> + +namespace NKikimr::NKqp::NPrivateEvents { + +struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> { + TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query, + bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) + : UserToken(userToken) + , Uid(uid) + , Query(std::move(query)) + , KeepInCache(keepInCache) + , Deadline(deadline) + , DbCounters(dbCounters) + , Orbit(std::move(orbit)) { + Y_ENSURE(Uid.Defined() != Query.Defined()); + } + + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TMaybe<TString> Uid; + TMaybe<TKqpQueryId> Query; + bool KeepInCache = false; + // it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration) + TInstant Deadline; + TKqpDbCountersPtr DbCounters; + TMaybe<bool> DocumentApiRestricted; + + NLWTrace::TOrbit Orbit; +}; + +struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> { + TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query, + TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) + : UserToken(userToken) + , Uid(uid) + , Query(query) + , Deadline(deadline) + , DbCounters(dbCounters) + , Orbit(std::move(orbit)) { + } + + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TString Uid; + TMaybe<TKqpQueryId> Query; + + TInstant Deadline; + TKqpDbCountersPtr DbCounters; + + NLWTrace::TOrbit Orbit; +}; + +struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> { + TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {}) + : CompileResult(compileResult) + , Orbit(std::move(orbit)) { + } + + TKqpCompileResult::TConstPtr CompileResult; + NKqpProto::TKqpStatsCompile Stats; + std::optional<TString> ReplayMessage; + + NLWTrace::TOrbit Orbit; +}; + +struct TEvCompileInvalidateRequest: public TEventLocal<TEvCompileInvalidateRequest, + TKqpEvents::EvCompileInvalidateRequest> { + TEvCompileInvalidateRequest(const TString& uid, TKqpDbCountersPtr dbCounters) + : Uid(uid) + , DbCounters(dbCounters) { + } + + TString Uid; + TKqpDbCountersPtr DbCounters; +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/compilation/result.cpp b/ydb/core/kqp/common/compilation/result.cpp index 138843cf36b..292cd0d03c4 100644 --- a/ydb/core/kqp/common/compilation/result.cpp +++ b/ydb/core/kqp/common/compilation/result.cpp @@ -1,5 +1,4 @@ #include "result.h" -#include <ydb/core/kqp/query_data/kqp_prepared_query.h> namespace NKikimr::NKqp { diff --git a/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt index 2e48fa7671d..219ea9df7fb 100644 --- a/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/events/CMakeLists.darwin-x86_64.txt @@ -16,8 +16,16 @@ target_link_libraries(kqp-common-events PUBLIC yutil ydb-core-protos ydb-core-base + core-grpc_services-base + core-grpc_services-cancelation + kqp-common-shutdown + kqp-common-compilation + yql-dq-actors + api-protos + cpp-actors-core ) target_sources(kqp-common-events PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp ) diff --git a/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt index 1013ee8365b..f01f66089fe 100644 --- a/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/events/CMakeLists.linux-aarch64.txt @@ -17,8 +17,16 @@ target_link_libraries(kqp-common-events PUBLIC yutil ydb-core-protos ydb-core-base + core-grpc_services-base + core-grpc_services-cancelation + kqp-common-shutdown + kqp-common-compilation + yql-dq-actors + api-protos + cpp-actors-core ) target_sources(kqp-common-events PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp ) diff --git a/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt index 1013ee8365b..f01f66089fe 100644 --- a/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/events/CMakeLists.linux-x86_64.txt @@ -17,8 +17,16 @@ target_link_libraries(kqp-common-events PUBLIC yutil ydb-core-protos ydb-core-base + core-grpc_services-base + core-grpc_services-cancelation + kqp-common-shutdown + kqp-common-compilation + yql-dq-actors + api-protos + cpp-actors-core ) target_sources(kqp-common-events PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp ) diff --git a/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt index 2e48fa7671d..219ea9df7fb 100644 --- a/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/events/CMakeLists.windows-x86_64.txt @@ -16,8 +16,16 @@ target_link_libraries(kqp-common-events PUBLIC yutil ydb-core-protos ydb-core-base + core-grpc_services-base + core-grpc_services-cancelation + kqp-common-shutdown + kqp-common-compilation + yql-dq-actors + api-protos + cpp-actors-core ) target_sources(kqp-common-events PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/process_response.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/kqp_event_ids.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/events/query.cpp ) diff --git a/ydb/core/kqp/common/events/events.cpp b/ydb/core/kqp/common/events/events.cpp new file mode 100644 index 00000000000..4a55f64db54 --- /dev/null +++ b/ydb/core/kqp/common/events/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NKqp { + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h new file mode 100644 index 00000000000..9a0702f849b --- /dev/null +++ b/ydb/core/kqp/common/events/events.h @@ -0,0 +1,124 @@ +#pragma once + +#include "process_response.h" +#include "query.h" +#include <ydb/core/kqp/common/simple/kqp_event_ids.h> +#include <ydb/core/protos/kqp.pb.h> +#include <ydb/core/kqp/common/compilation/events.h> +#include <ydb/core/kqp/common/shutdown/events.h> +#include <ydb/public/api/protos/draft/ydb_query.pb.h> +#include <ydb/library/yql/dq/actors/dq.h> + +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/event_load.h> +#include <contrib/libs/protobuf/src/google/protobuf/map.h> + +namespace NKikimr::NKqp { + +struct TEvKqp { + using TEvQueryRequestRemote = NPrivateEvents::TEvQueryRequestRemote; + + using TEvProcessResponse = NPrivateEvents::TEvProcessResponse; + + using TEvQueryRequest = NPrivateEvents::TEvQueryRequest; + + struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest, + NKikimrKqp::TEvCloseSessionRequest, TKqpEvents::EvCloseSessionRequest> {}; + + struct TEvCreateSessionRequest : public TEventPB<TEvCreateSessionRequest, + NKikimrKqp::TEvCreateSessionRequest, TKqpEvents::EvCreateSessionRequest> {}; + + struct TEvPingSessionRequest : public TEventPB<TEvPingSessionRequest, + NKikimrKqp::TEvPingSessionRequest, TKqpEvents::EvPingSessionRequest> {}; + + + using TEvCompileRequest = NPrivateEvents::TEvCompileRequest; + using TEvRecompileRequest = NPrivateEvents::TEvRecompileRequest; + using TEvCompileResponse = NPrivateEvents::TEvCompileResponse; + using TEvCompileInvalidateRequest = NPrivateEvents::TEvCompileInvalidateRequest; + + using TEvInitiateSessionShutdown = NKikimr::NKqp::NPrivateEvents::TEvInitiateSessionShutdown; + using TEvContinueShutdown = NKikimr::NKqp::NPrivateEvents::TEvContinueShutdown; + + using TEvDataQueryStreamPart = NPrivateEvents::TEvDataQueryStreamPart; + + struct TEvDataQueryStreamPartAck : public TEventLocal<TEvDataQueryStreamPartAck, TKqpEvents::EvDataQueryStreamPartAck> {}; + + template <typename TProto> + using TProtoArenaHolder = NPrivateEvents::TProtoArenaHolder<TProto>; + + using TEvQueryResponse = NPrivateEvents::TEvQueryResponse; + + struct TEvCreateSessionResponse : public TEventPB<TEvCreateSessionResponse, + NKikimrKqp::TEvCreateSessionResponse, TKqpEvents::EvCreateSessionResponse> {}; + + struct TEvContinueProcess : public TEventLocal<TEvContinueProcess, TKqpEvents::EvContinueProcess> { + TEvContinueProcess(ui32 queryId, bool finished) + : QueryId(queryId) + , Finished(finished) {} + + ui32 QueryId; + bool Finished; + }; + + using TEvQueryTimeout = NPrivateEvents::TEvQueryTimeout; + + struct TEvIdleTimeout : public TEventLocal<TEvIdleTimeout, TKqpEvents::EvIdleTimeout> { + TEvIdleTimeout(ui32 timerId) + : TimerId(timerId) {} + + ui32 TimerId; + }; + + struct TEvCloseSessionResponse : public TEventPB<TEvCloseSessionResponse, + NKikimrKqp::TEvCloseSessionResponse, TKqpEvents::EvCloseSessionResponse> {}; + + struct TEvPingSessionResponse : public TEventPB<TEvPingSessionResponse, + NKikimrKqp::TEvPingSessionResponse, TKqpEvents::EvPingSessionResponse> {}; + + struct TEvKqpProxyPublishRequest : + public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {}; + + using TEvInitiateShutdownRequest = NPrivateEvents::TEvInitiateShutdownRequest; + + struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> { + TEvScriptRequest() = default; + + mutable NKikimrKqp::TEvQueryRequest Record; + }; + + struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> { + TEvScriptResponse(TString operationId, TString executionId, Ydb::Query::ExecStatus execStatus, Ydb::Query::ExecMode execMode) + : Status(Ydb::StatusIds::SUCCESS) + , OperationId(std::move(operationId)) + , ExecutionId(std::move(executionId)) + , ExecStatus(execStatus) + , ExecMode(execMode) + {} + + TEvScriptResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Issues(std::move(issues)) + , ExecStatus(Ydb::Query::EXEC_STATUS_FAILED) + , ExecMode(Ydb::Query::EXEC_MODE_UNSPECIFIED) + {} + + const Ydb::StatusIds::StatusCode Status; + const NYql::TIssues Issues; + const TString OperationId; + const TString ExecutionId; + const Ydb::Query::ExecStatus ExecStatus; + const Ydb::Query::ExecMode ExecMode; + }; + + using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution; + + struct TEvFetchScriptResultsRequest : public TEventPB<TEvFetchScriptResultsRequest, NKikimrKqp::TEvFetchScriptResultsRequest, TKqpEvents::EvFetchScriptResultsRequest> { + }; + + struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> { + }; +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/process_response.h b/ydb/core/kqp/common/events/process_response.h index bdae89bb29d..3c0adea39ce 100644 --- a/ydb/core/kqp/common/events/process_response.h +++ b/ydb/core/kqp/common/events/process_response.h @@ -2,7 +2,7 @@ #include <ydb/core/protos/kqp.pb.h> #include <library/cpp/actors/core/event_pb.h> #include <util/generic/ptr.h> -#include "kqp_event_ids.h" +#include <ydb/core/kqp/common/simple/kqp_event_ids.h> namespace NKikimr::NKqp::NPrivateEvents { diff --git a/ydb/core/kqp/common/events/query.cpp b/ydb/core/kqp/common/events/query.cpp new file mode 100644 index 00000000000..b6724b22ae7 --- /dev/null +++ b/ydb/core/kqp/common/events/query.cpp @@ -0,0 +1,5 @@ +#include "query.h" + +namespace NKikimr::NKqp { + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h new file mode 100644 index 00000000000..c439cfffc06 --- /dev/null +++ b/ydb/core/kqp/common/events/query.h @@ -0,0 +1,388 @@ +#pragma once +#include <ydb/core/protos/kqp.pb.h> +#include <ydb/core/kqp/common/simple/kqp_event_ids.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/cancelation/cancelation_event.h> +#include <ydb/core/grpc_services/cancelation/cancelation.h> + +#include <ydb/public/api/protos/draft/ydb_query.pb.h> +#include <ydb/library/aclib/aclib.h> +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/event_local.h> + +namespace NKikimr::NKqp::NPrivateEvents { + +struct TEvQueryRequestRemote: public TEventPB<TEvQueryRequestRemote, NKikimrKqp::TEvQueryRequest, + TKqpEvents::EvQueryRequest> { +}; + +struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> { +public: + TEvQueryRequest( + NKikimrKqp::EQueryAction queryAction, + NKikimrKqp::EQueryType queryType, + TActorId requestActorId, + const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx, + const TString& sessionId, + TString&& yqlText, + TString&& queryId, + const ::Ydb::Table::TransactionControl* txControl, + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters, + const ::Ydb::Table::QueryStatsCollection::Mode collectStats, + const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, + const ::Ydb::Operations::OperationParams* operationParams, + bool keepSession = false); + + TEvQueryRequest() = default; + + bool IsSerializable() const override { + return true; + } + + TEventSerializationInfo CreateSerializationInfo() const override { return {}; } + + const TString& GetDatabase() const { + return RequestCtx ? Database : Record.GetRequest().GetDatabase(); + } + + bool HasYdbStatus() const { + return RequestCtx ? false : Record.HasYdbStatus(); + } + + const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { + return Record.GetRequest().GetTopicOperations(); + } + + bool HasTopicOperations() const { + return Record.GetRequest().HasTopicOperations(); + } + + bool GetKeepSession() const { + return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession(); + } + + TDuration GetCancelAfter() const { + return RequestCtx ? CancelAfter : TDuration::MilliSeconds(Record.GetRequest().GetCancelAfterMs()); + } + + TDuration GetOperationTimeout() const { + return RequestCtx ? OperationTimeout : TDuration::MilliSeconds(Record.GetRequest().GetTimeoutMs()); + } + + bool HasAction() const { + return RequestCtx ? true : Record.GetRequest().HasAction(); + } + + void SetSessionId(const TString& sessionId) { + if (RequestCtx) { + SessionId = sessionId; + } else { + Record.MutableRequest()->SetSessionId(sessionId); + } + } + + const TString& GetSessionId() const { + return RequestCtx ? SessionId : Record.GetRequest().GetSessionId(); + } + + NKikimrKqp::EQueryAction GetAction() const { + return RequestCtx ? QueryAction : Record.GetRequest().GetAction(); + } + + NKikimrKqp::EQueryType GetType() const { + return RequestCtx ? QueryType : Record.GetRequest().GetType(); + } + + bool HasPreparedQuery() const { + return RequestCtx ? QueryId.size() > 0 : Record.GetRequest().HasPreparedQuery(); + } + + const TString& GetPreparedQuery() const { + return RequestCtx ? QueryId : Record.GetRequest().GetPreparedQuery(); + } + + const TString& GetQuery() const { + return RequestCtx ? YqlText : Record.GetRequest().GetQuery(); + } + + const ::NKikimrMiniKQL::TParams& GetParameters() const { + return Record.GetRequest().GetParameters(); + } + + const ::Ydb::Table::TransactionControl& GetTxControl() const { + return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl(); + } + + bool GetUsePublicResponseDataFormat() const { + return RequestCtx ? true : Record.GetRequest().GetUsePublicResponseDataFormat(); + } + + bool GetQueryKeepInCache() const { + if (RequestCtx) { + if (QueryCachePolicy != nullptr) { + return QueryCachePolicy->keep_in_cache(); + } + return false; + } + return Record.GetRequest().GetQueryCachePolicy().keep_in_cache(); + } + + bool HasTxControl() const { + return RequestCtx ? TxControl != nullptr : Record.GetRequest().HasTxControl(); + } + + bool HasCollectStats() const { + return RequestCtx ? true : Record.GetRequest().HasCollectStats(); + } + + TActorId GetRequestActorId() const { + return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId()); + } + + google::protobuf::Arena* GetArena() { + return RequestCtx ? RequestCtx->GetArena() : nullptr; + } + + const TString& GetTraceId() const { + if (RequestCtx) { + if (!TraceId) { + TraceId = RequestCtx->GetTraceId().GetOrElse(""); + } + return TraceId; + } + + return Record.GetTraceId(); + } + + const TString& GetRequestType() const { + if (RequestCtx) { + if (!RequestType) { + RequestType = RequestCtx->GetRequestType().GetOrElse(""); + } + return RequestType; + } + + return Record.GetRequestType(); + } + + const TIntrusiveConstPtr<NACLib::TUserToken>& GetUserToken() const { + if (RequestCtx && RequestCtx->GetInternalToken()) { + return RequestCtx->GetInternalToken(); + } + + if (Token_) { + return Token_; + } + + Token_ = new NACLib::TUserToken(Record.GetUserToken()); + return Token_; + } + + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const { + if (YdbParameters) { + return *YdbParameters; + } + + return Record.GetRequest().GetYdbParameters(); + } + + Ydb::StatusIds::StatusCode GetYdbStatus() const { + return Record.GetYdbStatus(); + } + + ::Ydb::Table::QueryStatsCollection::Mode GetCollectStats() const { + if (RequestCtx) { + return CollectStats; + } + + return Record.GetRequest().GetCollectStats(); + } + + const ::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& GetQueryIssues() const { + return Record.GetQueryIssues(); + } + + ui64 GetRequestSize() const { + return Record.GetRequest().ByteSizeLong(); + } + + ui64 GetQuerySize() const { + return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size(); + } + + bool IsInternalCall() const { + return RequestCtx ? RequestCtx->IsInternalCall() : Record.GetRequest().GetIsInternalCall(); + } + + ui64 GetParametersSize() const { + if (ParametersSize > 0) { + return ParametersSize; + } + + ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong(); + for (const auto& [name, param] : GetYdbParameters()) { + ParametersSize += name.size(); + ParametersSize += param.ByteSizeLong(); + } + + return ParametersSize; + } + + ui32 CalculateSerializedSize() const override { + PrepareRemote(); + return Record.ByteSize(); + } + + bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override { + PrepareRemote(); + return Record.SerializeToZeroCopyStream(chunker); + } + + static NActors::IEventBase* Load(TEventSerializedData* data) { + auto pbEv = THolder<TEvQueryRequestRemote>(static_cast<TEvQueryRequestRemote*>(TEvQueryRequestRemote::Load(data))); + auto req = new TEvQueryRequest(); + req->Record.Swap(&pbEv->Record); + return req; + } + + void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) { + if (RequestCtx) { + RequestCtx->SetClientLostAction([actorId, as]() { + as->Send(actorId, new NGRpcService::TEvClientLost()); + }); + } else if (Record.HasCancelationActor()) { + auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor()); + NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, as); + } + } + + void PrepareRemote() const; + + mutable NKikimrKqp::TEvQueryRequest Record; + +private: + mutable ui64 ParametersSize = 0; + mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx; + mutable TString TraceId; + mutable TString RequestType; + mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_; + TActorId RequestActorId; + TString Database; + TString SessionId; + TString YqlText; + TString QueryId; + NKikimrKqp::EQueryAction QueryAction; + NKikimrKqp::EQueryType QueryType; + const ::Ydb::Table::TransactionControl* TxControl = nullptr; + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr; + const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; + const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr; + const ::Ydb::Operations::OperationParams* OperationParams = nullptr; + bool KeepSession = false; + TDuration OperationTimeout; + TDuration CancelAfter; +}; + +struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart, + NKikimrKqp::TEvDataQueryStreamPart, TKqpEvents::EvDataQueryStreamPart> { +}; + +struct TEvDataQueryStreamPartAck: public TEventLocal<TEvDataQueryStreamPartAck, TKqpEvents::EvDataQueryStreamPartAck> {}; + +// Wrapper to use Arena allocated protobuf with ActorSystem (for serialization path). +// Arena deserialization is not supported. +// TODO: Add arena support to actor system TEventPB? +template<typename TProto> +class TProtoArenaHolder: public TNonCopyable { +public: + TProtoArenaHolder() + : Protobuf_(google::protobuf::Arena::CreateMessage<TProto>(nullptr)) + , NeedDelete_(true) { + } + + ~TProtoArenaHolder() { + // Deallocate message only if it was "normal" allocation + // In case of protobuf arena memory will be freed during arena deallocation + if (NeedDelete_) { + delete Protobuf_; + } + } + + void Realloc(std::shared_ptr<google::protobuf::Arena> arena) { + ReallocRef(arena.get()); + Arena_ = arena; + } + + void ReallocRef(google::protobuf::Arena* arena) { + // Allow realloc only if previous allocation was made using "normal" allocator + // and no data was writen. It prevents ineffective using of protobuf. + Y_ASSERT(!Protobuf_->GetArena()); + Y_ASSERT(ByteSize() == 0); + delete Protobuf_; + Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena); + if (arena) { + NeedDelete_ = false; + } + } + + bool ParseFromString(const TString& data) { + return Protobuf_->ParseFromString(data); + } + + bool ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream* input) { + return Protobuf_->ParseFromZeroCopyStream(input); + } + + bool SerializeToZeroCopyStream(google::protobuf::io::ZeroCopyOutputStream* output) const { + return Protobuf_->SerializeToZeroCopyStream(output); + } + + bool SerializeToString(TString* output) const { + return Protobuf_->SerializeToString(output); + } + + int ByteSize() const { + return Protobuf_->ByteSize(); + } + + TString DebugString() const { + return Protobuf_->DebugString(); + } + + TString ShortDebugString() const { + return Protobuf_->ShortDebugString(); + } + + TString GetTypeName() const { + return Protobuf_->GetTypeName(); + } + + const TProto& GetRef() const { + return *Protobuf_; + } + + TProto& GetRef() { + return *Protobuf_; + } + +private: + TProtoArenaHolder(TProtoArenaHolder&&) = default; + TProtoArenaHolder& operator=(TProtoArenaHolder&&) = default; + TProto* Protobuf_; + std::shared_ptr<google::protobuf::Arena> Arena_; + bool NeedDelete_; +}; + +struct TEvQueryTimeout: public TEventLocal<TEvQueryTimeout, TKqpEvents::EvQueryTimeout> { + TEvQueryTimeout(ui32 queryId) + : QueryId(queryId) { + } + + ui32 QueryId; +}; + +struct TEvQueryResponse: public TEventPB<TEvQueryResponse, TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>, + TKqpEvents::EvQueryResponse> { +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/kqp.cpp b/ydb/core/kqp/common/kqp.cpp index fe9f7d85663..7f3bdff3383 100644 --- a/ydb/core/kqp/common/kqp.cpp +++ b/ydb/core/kqp/common/kqp.cpp @@ -6,44 +6,4 @@ namespace NKikimr::NKqp { -TKqpShutdownController::TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool enableGraceful) - : KqpProxyActorId_(kqpProxyActorId) - , EnableGraceful(enableGraceful) - , TableServiceConfig(tableServiceConfig) -{ - ShutdownState_.Reset(new TKqpShutdownState()); -} - -void TKqpShutdownController::Initialize(NActors::TActorSystem* actorSystem) { - ActorSystem_ = actorSystem; -} - -void TKqpShutdownController::Stop() { - if (!EnableGraceful) - return; - - ActorSystem_->Send(new NActors::IEventHandle(KqpProxyActorId_, {}, new TEvKqp::TEvInitiateShutdownRequest(ShutdownState_))); - auto timeout = TDuration::MilliSeconds(TableServiceConfig.GetShutdownSettings().GetShutdownTimeoutMs()); - auto startedAt = TInstant::Now(); - auto spent = (TInstant::Now() - startedAt).SecondsFloat(); - ui32 iteration = 0; - while (spent < timeout.SecondsFloat()) { - if (iteration % 30 == 0) { - Cerr << "Current KQP shutdown state: spent " << spent << " seconds, "; - if (ShutdownState_->Initialized()) { - Cerr << ShutdownState_->GetPendingSessions() << " sessions to shutdown" << Endl; - } else { - Cerr << "not started yet" << Endl; - } - } - - if (ShutdownState_->ShutdownComplete()) - break; - - Sleep(TDuration::MilliSeconds(100)); - ++iteration; - spent = (TInstant::Now() - startedAt).SecondsFloat(); - } -} - } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 3a8013ca654..9894f5f0b64 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -4,8 +4,10 @@ #include "simple/helpers.h" #include "simple/query_id.h" #include "simple/settings.h" -#include "events/process_response.h" +#include "simple/services.h" +#include "events/events.h" #include "compilation/result.h" +#include "shutdown/controller.h" #include <library/cpp/lwtrace/shuttle.h> @@ -22,8 +24,7 @@ #include <util/generic/guid.h> #include <util/generic/ptr.h> -namespace NKikimr { -namespace NKqp { +namespace NKikimr::NKqp { void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to); @@ -35,631 +36,6 @@ inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) { } } -const TStringBuf DefaultKikimrPublicClusterName = "db"; - -inline NActors::TActorId MakeKqpProxyID(ui32 nodeId) { - const char name[12] = "kqp_proxy"; - return NActors::TActorId(nodeId, TStringBuf(name, 12)); -} - -inline NActors::TActorId MakeKqpCompileServiceID(ui32 nodeId) { - const char name[12] = "kqp_compile"; - return NActors::TActorId(nodeId, TStringBuf(name, 12)); -} - -inline NActors::TActorId MakeKqpResourceManagerServiceID(ui32 nodeId) { - const char name[12] = "kqp_resman"; - return NActors::TActorId(nodeId, TStringBuf(name, 12)); -} - -inline NActors::TActorId MakeKqpRmServiceID(ui32 nodeId) { - const char name[12] = "kqp_rm"; - return NActors::TActorId(nodeId, TStringBuf(name, 12)); -} - -inline NActors::TActorId MakeKqpNodeServiceID(ui32 nodeId) { - const char name[12] = "kqp_node"; - return NActors::TActorId(nodeId, TStringBuf(name, 12)); -} - -inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) { - const char name[12] = "kqp_lfspill"; - return NActors::TActorId(nodeId, TStringBuf(name, 12)); -} - -class TKqpShutdownController; - -class TKqpShutdownState : public TThrRefBase { - friend class TKqpShutdownController; - -public: - void Update(ui32 pendingSessions) { - AtomicSet(PendingSessions_, pendingSessions); - - if (!Initialized()) { - AtomicSet(Initialized_, 1); - } - - if (!pendingSessions) { - SetCompleted(); - } - } -private: - bool ShutdownComplete() const { - return AtomicGet(ShutdownComplete_) == 1; - } - - ui32 GetPendingSessions() const { - return AtomicGet(PendingSessions_); - } - - bool Initialized() const { - return AtomicGet(Initialized_) == 1; - } - - void SetCompleted() { - AtomicSet(ShutdownComplete_, 1); - } - - TAtomic PendingSessions_ = 0; - TAtomic Initialized_ = 0; - TAtomic ShutdownComplete_ = 0; -}; - - -class TKqpShutdownController { -public: - TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool gracefulEnabled); - ~TKqpShutdownController() = default; - - void Initialize(NActors::TActorSystem* actorSystem); - void Stop(); - -private: - NActors::TActorId KqpProxyActorId_; - NActors::TActorSystem* ActorSystem_; - bool EnableGraceful; - NKikimrConfig::TTableServiceConfig TableServiceConfig; - TIntrusivePtr<TKqpShutdownState> ShutdownState_; -}; - -struct TEvKqp { - struct TEvQueryRequestRemote : public TEventPB<TEvQueryRequestRemote, NKikimrKqp::TEvQueryRequest, - TKqpEvents::EvQueryRequest> {}; - - using TEvProcessResponse = NPrivateEvents::TEvProcessResponse; - - struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> { - public: - TEvQueryRequest( - NKikimrKqp::EQueryAction queryAction, - NKikimrKqp::EQueryType queryType, - TActorId requestActorId, - const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx, - const TString& sessionId, - TString&& yqlText, - TString&& queryId, - const ::Ydb::Table::TransactionControl* txControl, - const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters, - const ::Ydb::Table::QueryStatsCollection::Mode collectStats, - const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, - const ::Ydb::Operations::OperationParams* operationParams, - bool keepSession = false); - - TEvQueryRequest() = default; - - bool IsSerializable() const override { - return true; - } - - TEventSerializationInfo CreateSerializationInfo() const override { return {}; } - - const TString& GetDatabase() const { - return RequestCtx ? Database : Record.GetRequest().GetDatabase(); - } - - bool HasYdbStatus() const { - return RequestCtx ? false : Record.HasYdbStatus(); - } - - const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { - return Record.GetRequest().GetTopicOperations(); - } - - bool HasTopicOperations() const { - return Record.GetRequest().HasTopicOperations(); - } - - bool GetKeepSession() const { - return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession(); - } - - TDuration GetCancelAfter() const { - return RequestCtx ? CancelAfter : TDuration::MilliSeconds(Record.GetRequest().GetCancelAfterMs()); - } - - TDuration GetOperationTimeout() const { - return RequestCtx ? OperationTimeout : TDuration::MilliSeconds(Record.GetRequest().GetTimeoutMs()); - } - - bool HasAction() const { - return RequestCtx ? true : Record.GetRequest().HasAction(); - } - - void SetSessionId(const TString& sessionId) { - if (RequestCtx) { - SessionId = sessionId; - } else { - Record.MutableRequest()->SetSessionId(sessionId); - } - } - - const TString& GetSessionId() const { - return RequestCtx ? SessionId : Record.GetRequest().GetSessionId(); - } - - NKikimrKqp::EQueryAction GetAction() const { - return RequestCtx ? QueryAction : Record.GetRequest().GetAction(); - } - - NKikimrKqp::EQueryType GetType() const { - return RequestCtx ? QueryType : Record.GetRequest().GetType(); - } - - bool HasPreparedQuery() const { - return RequestCtx ? QueryId.size() > 0 : Record.GetRequest().HasPreparedQuery(); - } - - const TString& GetPreparedQuery() const { - return RequestCtx ? QueryId : Record.GetRequest().GetPreparedQuery(); - } - - const TString& GetQuery() const { - return RequestCtx ? YqlText : Record.GetRequest().GetQuery(); - } - - const ::NKikimrMiniKQL::TParams& GetParameters() const { - return Record.GetRequest().GetParameters(); - } - - const ::Ydb::Table::TransactionControl& GetTxControl() const { - return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl(); - } - - bool GetUsePublicResponseDataFormat() const { - return RequestCtx ? true : Record.GetRequest().GetUsePublicResponseDataFormat(); - } - - bool GetQueryKeepInCache() const { - if (RequestCtx) { - if (QueryCachePolicy != nullptr) { - return QueryCachePolicy->keep_in_cache(); - } - return false; - } - return Record.GetRequest().GetQueryCachePolicy().keep_in_cache(); - } - - bool HasTxControl() const { - return RequestCtx ? TxControl != nullptr : Record.GetRequest().HasTxControl(); - } - - bool HasCollectStats() const { - return RequestCtx ? true : Record.GetRequest().HasCollectStats(); - } - - TActorId GetRequestActorId() const { - return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId()); - } - - google::protobuf::Arena* GetArena() { - return RequestCtx ? RequestCtx->GetArena() : nullptr; - } - - const TString& GetTraceId() const { - if (RequestCtx) { - if (!TraceId) { - TraceId = RequestCtx->GetTraceId().GetOrElse(""); - } - return TraceId; - } - - return Record.GetTraceId(); - } - - const TString& GetRequestType() const { - if (RequestCtx) { - if (!RequestType) { - RequestType = RequestCtx->GetRequestType().GetOrElse(""); - } - return RequestType; - } - - return Record.GetRequestType(); - } - - const TIntrusiveConstPtr<NACLib::TUserToken>& GetUserToken() const { - if (RequestCtx && RequestCtx->GetInternalToken()) { - return RequestCtx->GetInternalToken(); - } - - if (Token_) { - return Token_; - } - - Token_ = new NACLib::TUserToken(Record.GetUserToken()); - return Token_; - } - - const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const { - if (YdbParameters) { - return *YdbParameters; - } - - return Record.GetRequest().GetYdbParameters(); - } - - Ydb::StatusIds::StatusCode GetYdbStatus() const { - return Record.GetYdbStatus(); - } - - ::Ydb::Table::QueryStatsCollection::Mode GetCollectStats() const { - if (RequestCtx) { - return CollectStats; - } - - return Record.GetRequest().GetCollectStats(); - } - - const ::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& GetQueryIssues() const { - return Record.GetQueryIssues(); - } - - ui64 GetRequestSize() const { - return Record.GetRequest().ByteSizeLong(); - } - - ui64 GetQuerySize() const { - return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size(); - } - - bool IsInternalCall() const { - return RequestCtx ? RequestCtx->IsInternalCall() : Record.GetRequest().GetIsInternalCall(); - } - - ui64 GetParametersSize() const { - if (ParametersSize > 0) { - return ParametersSize; - } - - ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong(); - for(const auto& [name, param]: GetYdbParameters()) { - ParametersSize += name.size(); - ParametersSize += param.ByteSizeLong(); - } - - return ParametersSize; - } - - ui32 CalculateSerializedSize() const override { - PrepareRemote(); - return Record.ByteSize(); - } - - bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override { - PrepareRemote(); - return Record.SerializeToZeroCopyStream(chunker); - } - - static NActors::IEventBase* Load(TEventSerializedData* data) { - auto pbEv = THolder<TEvQueryRequestRemote>(static_cast<TEvQueryRequestRemote*>(TEvQueryRequestRemote::Load(data))); - auto req = new TEvQueryRequest(); - req->Record.Swap(&pbEv->Record); - return req; - } - - void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) { - if (RequestCtx) { - RequestCtx->SetClientLostAction([actorId, as]() { - as->Send(actorId, new NGRpcService::TEvClientLost()); - }); - } else if (Record.HasCancelationActor()) { - auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor()); - NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, as); - } - } - - void PrepareRemote() const; - - mutable NKikimrKqp::TEvQueryRequest Record; - - private: - mutable ui64 ParametersSize = 0; - mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx; - mutable TString TraceId; - mutable TString RequestType; - mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_; - TActorId RequestActorId; - TString Database; - TString SessionId; - TString YqlText; - TString QueryId; - NKikimrKqp::EQueryAction QueryAction; - NKikimrKqp::EQueryType QueryType; - const ::Ydb::Table::TransactionControl* TxControl = nullptr; - const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr; - const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; - const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr; - const ::Ydb::Operations::OperationParams* OperationParams = nullptr; - bool KeepSession = false; - TDuration OperationTimeout; - TDuration CancelAfter; - }; - - struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest, - NKikimrKqp::TEvCloseSessionRequest, TKqpEvents::EvCloseSessionRequest> {}; - - struct TEvCreateSessionRequest : public TEventPB<TEvCreateSessionRequest, - NKikimrKqp::TEvCreateSessionRequest, TKqpEvents::EvCreateSessionRequest> {}; - - struct TEvPingSessionRequest : public TEventPB<TEvPingSessionRequest, - NKikimrKqp::TEvPingSessionRequest, TKqpEvents::EvPingSessionRequest> {}; - - struct TEvInitiateSessionShutdown : public TEventLocal<TEvInitiateSessionShutdown, TKqpEvents::EvInitiateSessionShutdown> { - ui32 SoftTimeoutMs; - ui32 HardTimeoutMs; - - TEvInitiateSessionShutdown(ui32 softTimeoutMs, ui32 hardTimeoutMs) - : SoftTimeoutMs(softTimeoutMs) - , HardTimeoutMs(hardTimeoutMs) - {} - }; - - struct TEvContinueShutdown : public TEventLocal<TEvContinueShutdown, TKqpEvents::EvContinueShutdown> {}; - - struct TEvDataQueryStreamPart : public TEventPB<TEvDataQueryStreamPart, - NKikimrKqp::TEvDataQueryStreamPart, TKqpEvents::EvDataQueryStreamPart> {}; - - struct TEvDataQueryStreamPartAck : public TEventLocal<TEvDataQueryStreamPartAck, TKqpEvents::EvDataQueryStreamPartAck> {}; - - // Wrapper to use Arena allocated protobuf with ActorSystem (for serialization path). - // Arena deserialization is not supported. - // TODO: Add arena support to actor system TEventPB? - template<typename TProto> - class TProtoArenaHolder : public TNonCopyable { - public: - TProtoArenaHolder() - : Protobuf_(google::protobuf::Arena::CreateMessage<TProto>(nullptr)) - , NeedDelete_(true) - {} - - ~TProtoArenaHolder() { - // Deallocate message only if it was "normal" allocation - // In case of protobuf arena memory will be freed during arena deallocation - if (NeedDelete_) { - delete Protobuf_; - } - } - - void Realloc(std::shared_ptr<google::protobuf::Arena> arena) { - ReallocRef(arena.get()); - Arena_ = arena; - } - - void ReallocRef(google::protobuf::Arena* arena) { - // Allow realloc only if previous allocation was made using "normal" allocator - // and no data was writen. It prevents ineffective using of protobuf. - Y_ASSERT(!Protobuf_->GetArena()); - Y_ASSERT(ByteSize() == 0); - delete Protobuf_; - Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena); - if (arena) { - NeedDelete_ = false; - } - } - - bool ParseFromString(const TString& data) { - return Protobuf_->ParseFromString(data); - } - - bool ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream* input) { - return Protobuf_->ParseFromZeroCopyStream(input); - } - - bool SerializeToZeroCopyStream(google::protobuf::io::ZeroCopyOutputStream* output) const { - return Protobuf_->SerializeToZeroCopyStream(output); - } - - bool SerializeToString(TString* output) const { - return Protobuf_->SerializeToString(output); - } - - int ByteSize() const { - return Protobuf_->ByteSize(); - } - - TString DebugString() const { - return Protobuf_->DebugString(); - } - - TString ShortDebugString() const { - return Protobuf_->ShortDebugString(); - } - - TString GetTypeName() const { - return Protobuf_->GetTypeName(); - } - - const TProto& GetRef() const { - return *Protobuf_; - } - - TProto& GetRef() { - return *Protobuf_; - } - - private: - TProtoArenaHolder(TProtoArenaHolder&&) = default; - TProtoArenaHolder& operator=(TProtoArenaHolder&&) = default; - TProto* Protobuf_; - std::shared_ptr<google::protobuf::Arena> Arena_; - bool NeedDelete_; - }; - - struct TEvQueryResponse : public TEventPB<TEvQueryResponse, TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>, - TKqpEvents::EvQueryResponse> {}; - - struct TEvCreateSessionResponse : public TEventPB<TEvCreateSessionResponse, - NKikimrKqp::TEvCreateSessionResponse, TKqpEvents::EvCreateSessionResponse> {}; - - struct TEvContinueProcess : public TEventLocal<TEvContinueProcess, TKqpEvents::EvContinueProcess> { - TEvContinueProcess(ui32 queryId, bool finished) - : QueryId(queryId) - , Finished(finished) {} - - ui32 QueryId; - bool Finished; - }; - - struct TEvQueryTimeout : public TEventLocal<TEvQueryTimeout, TKqpEvents::EvQueryTimeout> { - TEvQueryTimeout(ui32 queryId) - : QueryId(queryId) {} - - ui32 QueryId; - }; - - struct TEvIdleTimeout : public TEventLocal<TEvIdleTimeout, TKqpEvents::EvIdleTimeout> { - TEvIdleTimeout(ui32 timerId) - : TimerId(timerId) {} - - ui32 TimerId; - }; - - struct TEvCloseSessionResponse : public TEventPB<TEvCloseSessionResponse, - NKikimrKqp::TEvCloseSessionResponse, TKqpEvents::EvCloseSessionResponse> {}; - - struct TEvPingSessionResponse : public TEventPB<TEvPingSessionResponse, - NKikimrKqp::TEvPingSessionResponse, TKqpEvents::EvPingSessionResponse> {}; - - struct TEvCompileRequest : public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> { - TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query, - bool keepInCache, TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) - : UserToken(userToken) - , Uid(uid) - , Query(std::move(query)) - , KeepInCache(keepInCache) - , Deadline(deadline) - , DbCounters(dbCounters) - , Orbit(std::move(orbit)) - { - Y_ENSURE(Uid.Defined() != Query.Defined()); - } - - TIntrusiveConstPtr<NACLib::TUserToken> UserToken; - TMaybe<TString> Uid; - TMaybe<TKqpQueryId> Query; - bool KeepInCache = false; - // it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration) - TInstant Deadline; - TKqpDbCountersPtr DbCounters; - TMaybe<bool> DocumentApiRestricted; - - NLWTrace::TOrbit Orbit; - }; - - struct TEvRecompileRequest : public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> { - TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query, - TInstant deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}) - : UserToken(userToken) - , Uid(uid) - , Query(query) - , Deadline(deadline) - , DbCounters(dbCounters) - , Orbit(std::move(orbit)) {} - - TIntrusiveConstPtr<NACLib::TUserToken> UserToken; - TString Uid; - TMaybe<TKqpQueryId> Query; - - TInstant Deadline; - TKqpDbCountersPtr DbCounters; - - NLWTrace::TOrbit Orbit; - }; - - struct TEvCompileResponse : public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> { - TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {}) - : CompileResult(compileResult) - , Orbit(std::move(orbit)) {} - - TKqpCompileResult::TConstPtr CompileResult; - NKqpProto::TKqpStatsCompile Stats; - std::optional<TString> ReplayMessage; - - NLWTrace::TOrbit Orbit; - }; - - struct TEvKqpProxyPublishRequest : - public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {}; - - struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest, - TKqpEvents::EvCompileInvalidateRequest> - { - TEvCompileInvalidateRequest(const TString& uid, TKqpDbCountersPtr dbCounters) - : Uid(uid) - , DbCounters(dbCounters) {} - - TString Uid; - TKqpDbCountersPtr DbCounters; - }; - - struct TEvInitiateShutdownRequest : public TEventLocal<TEvInitiateShutdownRequest, TKqpEvents::EvInitiateShutdownRequest> { - TIntrusivePtr<TKqpShutdownState> ShutdownState; - - TEvInitiateShutdownRequest(TIntrusivePtr<TKqpShutdownState> ShutdownState) - : ShutdownState(ShutdownState) - {} - }; - - struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> { - TEvScriptRequest() = default; - - mutable NKikimrKqp::TEvQueryRequest Record; - }; - - struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> { - TEvScriptResponse(TString operationId, TString executionId, Ydb::Query::ExecStatus execStatus, Ydb::Query::ExecMode execMode) - : Status(Ydb::StatusIds::SUCCESS) - , OperationId(std::move(operationId)) - , ExecutionId(std::move(executionId)) - , ExecStatus(execStatus) - , ExecMode(execMode) - {} - - TEvScriptResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) - : Status(status) - , Issues(std::move(issues)) - , ExecStatus(Ydb::Query::EXEC_STATUS_FAILED) - , ExecMode(Ydb::Query::EXEC_MODE_UNSPECIFIED) - {} - - const Ydb::StatusIds::StatusCode Status; - const NYql::TIssues Issues; - const TString OperationId; - const TString ExecutionId; - const Ydb::Query::ExecStatus ExecStatus; - const Ydb::Query::ExecMode ExecMode; - }; - - using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution; - - struct TEvFetchScriptResultsRequest : public TEventPB<TEvFetchScriptResultsRequest, NKikimrKqp::TEvFetchScriptResultsRequest, TKqpEvents::EvFetchScriptResultsRequest> { - }; - - struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> { - }; -}; - class TKqpRequestInfo { public: TKqpRequestInfo(const TString& traceId, const TString& sessionId) @@ -743,5 +119,4 @@ static inline IOutputStream& operator<<(IOutputStream& stream, const TKqpRequest return stream; } -} // namespace NKqp -} // namespace NKikimr +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/kqp_event_ids.h b/ydb/core/kqp/common/kqp_event_ids.h index 491443c69f8..24ada529f59 100644 --- a/ydb/core/kqp/common/kqp_event_ids.h +++ b/ydb/core/kqp/common/kqp_event_ids.h @@ -1,3 +1,3 @@ #pragma once -#include "events/kqp_event_ids.h" +#include "simple/kqp_event_ids.h" diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..11cb4459e7d --- /dev/null +++ b/ydb/core/kqp/common/shutdown/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(kqp-common-shutdown) +target_compile_options(kqp-common-shutdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(kqp-common-shutdown PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core +) +target_sources(kqp-common-shutdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp +) diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..8996ef48176 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/CMakeLists.linux-aarch64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(kqp-common-shutdown) +target_compile_options(kqp-common-shutdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(kqp-common-shutdown PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core +) +target_sources(kqp-common-shutdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp +) diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..8996ef48176 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/CMakeLists.linux-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(kqp-common-shutdown) +target_compile_options(kqp-common-shutdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(kqp-common-shutdown PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core +) +target_sources(kqp-common-shutdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp +) diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.txt b/ydb/core/kqp/common/shutdown/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/kqp/common/shutdown/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/shutdown/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..11cb4459e7d --- /dev/null +++ b/ydb/core/kqp/common/shutdown/CMakeLists.windows-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(kqp-common-shutdown) +target_compile_options(kqp-common-shutdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(kqp-common-shutdown PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core +) +target_sources(kqp-common-shutdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/shutdown/events.cpp +) diff --git a/ydb/core/kqp/common/shutdown/controller.cpp b/ydb/core/kqp/common/shutdown/controller.cpp new file mode 100644 index 00000000000..a6c1bd609e2 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/controller.cpp @@ -0,0 +1,45 @@ +#include "controller.h" +#include "events.h" + +namespace NKikimr::NKqp { + +TKqpShutdownController::TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool enableGraceful) + : KqpProxyActorId_(kqpProxyActorId) + , EnableGraceful(enableGraceful) + , TableServiceConfig(tableServiceConfig) { + ShutdownState_.Reset(new TKqpShutdownState()); +} + +void TKqpShutdownController::Initialize(NActors::TActorSystem* actorSystem) { + ActorSystem_ = actorSystem; +} + +void TKqpShutdownController::Stop() { + if (!EnableGraceful) + return; + + ActorSystem_->Send(new NActors::IEventHandle(KqpProxyActorId_, {}, new NPrivateEvents::TEvInitiateShutdownRequest(ShutdownState_))); + auto timeout = TDuration::MilliSeconds(TableServiceConfig.GetShutdownSettings().GetShutdownTimeoutMs()); + auto startedAt = TInstant::Now(); + auto spent = (TInstant::Now() - startedAt).SecondsFloat(); + ui32 iteration = 0; + while (spent < timeout.SecondsFloat()) { + if (iteration % 30 == 0) { + Cerr << "Current KQP shutdown state: spent " << spent << " seconds, "; + if (ShutdownState_->Initialized()) { + Cerr << ShutdownState_->GetPendingSessions() << " sessions to shutdown" << Endl; + } else { + Cerr << "not started yet" << Endl; + } + } + + if (ShutdownState_->ShutdownComplete()) + break; + + Sleep(TDuration::MilliSeconds(100)); + ++iteration; + spent = (TInstant::Now() - startedAt).SecondsFloat(); + } +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/shutdown/controller.h b/ydb/core/kqp/common/shutdown/controller.h new file mode 100644 index 00000000000..91e7ac5519c --- /dev/null +++ b/ydb/core/kqp/common/shutdown/controller.h @@ -0,0 +1,26 @@ +#pragma once +#include "state.h" +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <ydb/core/protos/config.pb.h> + +namespace NKikimr::NKqp { + +class TKqpShutdownController { +public: + TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool gracefulEnabled); + ~TKqpShutdownController() = default; + + void Initialize(NActors::TActorSystem* actorSystem); + void Stop(); + +private: + NActors::TActorId KqpProxyActorId_; + NActors::TActorSystem* ActorSystem_; + bool EnableGraceful; + NKikimrConfig::TTableServiceConfig TableServiceConfig; + TIntrusivePtr<TKqpShutdownState> ShutdownState_; +}; + + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/shutdown/events.cpp b/ydb/core/kqp/common/shutdown/events.cpp new file mode 100644 index 00000000000..4a55f64db54 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NKqp { + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/shutdown/events.h b/ydb/core/kqp/common/shutdown/events.h new file mode 100644 index 00000000000..46371901723 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/events.h @@ -0,0 +1,28 @@ +#pragma once +#include "state.h" +#include <library/cpp/actors/core/event_local.h> +#include <ydb/core/kqp/common/simple/kqp_event_ids.h> + +namespace NKikimr::NKqp::NPrivateEvents { + +struct TEvInitiateSessionShutdown: public TEventLocal<TEvInitiateSessionShutdown, TKqpEvents::EvInitiateSessionShutdown> { + ui32 SoftTimeoutMs; + ui32 HardTimeoutMs; + + TEvInitiateSessionShutdown(ui32 softTimeoutMs, ui32 hardTimeoutMs) + : SoftTimeoutMs(softTimeoutMs) + , HardTimeoutMs(hardTimeoutMs) { + } +}; + +struct TEvContinueShutdown: public TEventLocal<TEvContinueShutdown, TKqpEvents::EvContinueShutdown> {}; + +struct TEvInitiateShutdownRequest: public TEventLocal<TEvInitiateShutdownRequest, TKqpEvents::EvInitiateShutdownRequest> { + TIntrusivePtr<TKqpShutdownState> ShutdownState; + + TEvInitiateShutdownRequest(TIntrusivePtr<TKqpShutdownState> ShutdownState) + : ShutdownState(ShutdownState) { + } +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/shutdown/state.cpp b/ydb/core/kqp/common/shutdown/state.cpp new file mode 100644 index 00000000000..82a66253b74 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/state.cpp @@ -0,0 +1,17 @@ +#include "state.h" + +namespace NKikimr::NKqp { + +void TKqpShutdownState::Update(ui32 pendingSessions) { + AtomicSet(PendingSessions_, pendingSessions); + + if (!Initialized()) { + AtomicSet(Initialized_, 1); + } + + if (!pendingSessions) { + SetCompleted(); + } +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/shutdown/state.h b/ydb/core/kqp/common/shutdown/state.h new file mode 100644 index 00000000000..515a6999c20 --- /dev/null +++ b/ydb/core/kqp/common/shutdown/state.h @@ -0,0 +1,36 @@ +#pragma once +#include <util/generic/ptr.h> +#include <library/cpp/deprecated/atomic/atomic.h> + +namespace NKikimr::NKqp { + +class TKqpShutdownController; + +class TKqpShutdownState: public TThrRefBase { + friend class TKqpShutdownController; + +public: + void Update(ui32 pendingSessions); +private: + bool ShutdownComplete() const { + return AtomicGet(ShutdownComplete_) == 1; + } + + ui32 GetPendingSessions() const { + return AtomicGet(PendingSessions_); + } + + bool Initialized() const { + return AtomicGet(Initialized_) == 1; + } + + void SetCompleted() { + AtomicSet(ShutdownComplete_, 1); + } + + TAtomic PendingSessions_ = 0; + TAtomic Initialized_ = 0; + TAtomic ShutdownComplete_ = 0; +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt index 28c9e36787d..6833c55e76a 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt @@ -12,9 +12,13 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-cxxsupp yutil ydb-core-protos + yql-dq-actors + ydb-core-base ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp ) diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt index 0c8ad81844e..f9217daae45 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt @@ -13,9 +13,13 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-cxxsupp yutil ydb-core-protos + yql-dq-actors + ydb-core-base ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp ) diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt index 0c8ad81844e..f9217daae45 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt @@ -13,9 +13,13 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-cxxsupp yutil ydb-core-protos + yql-dq-actors + ydb-core-base ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp ) diff --git a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt index 28c9e36787d..6833c55e76a 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt @@ -12,9 +12,13 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-cxxsupp yutil ydb-core-protos + yql-dq-actors + ydb-core-base ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/query_id.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/services.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/kqp_event_ids.cpp ) diff --git a/ydb/core/kqp/common/events/kqp_event_ids.cpp b/ydb/core/kqp/common/simple/kqp_event_ids.cpp index 593199fb384..593199fb384 100644 --- a/ydb/core/kqp/common/events/kqp_event_ids.cpp +++ b/ydb/core/kqp/common/simple/kqp_event_ids.cpp diff --git a/ydb/core/kqp/common/events/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 9723277a5cb..9723277a5cb 100644 --- a/ydb/core/kqp/common/events/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h diff --git a/ydb/core/kqp/common/simple/services.cpp b/ydb/core/kqp/common/simple/services.cpp new file mode 100644 index 00000000000..2a7a17627d4 --- /dev/null +++ b/ydb/core/kqp/common/simple/services.cpp @@ -0,0 +1,5 @@ +#include "services.h" + +namespace NKikimr::NKqp { + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h new file mode 100644 index 00000000000..3d598c95d67 --- /dev/null +++ b/ydb/core/kqp/common/simple/services.h @@ -0,0 +1,39 @@ +#pragma once +#include <util/generic/strbuf.h> +#include <library/cpp/actors/core/actorid.h> + +namespace NKikimr::NKqp { + +const TStringBuf DefaultKikimrPublicClusterName = "db"; + +inline NActors::TActorId MakeKqpProxyID(ui32 nodeId) { + const char name[12] = "kqp_proxy"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + +inline NActors::TActorId MakeKqpCompileServiceID(ui32 nodeId) { + const char name[12] = "kqp_compile"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + +inline NActors::TActorId MakeKqpResourceManagerServiceID(ui32 nodeId) { + const char name[12] = "kqp_resman"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + +inline NActors::TActorId MakeKqpRmServiceID(ui32 nodeId) { + const char name[12] = "kqp_rm"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + +inline NActors::TActorId MakeKqpNodeServiceID(ui32 nodeId) { + const char name[12] = "kqp_node"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + +inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) { + const char name[12] = "kqp_lfspill"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + +} // namespace NKikimr::NKqp |