diff options
author | dcherednik <dcherednik@ydb.tech> | 2022-12-19 18:21:27 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2022-12-19 18:21:27 +0300 |
commit | 25a02bcefbd499da8e6be48f161e58354e63cc3c (patch) | |
tree | ebe4fc3977764d273187dcaea4575c496869a559 | |
parent | bc9bfa700f812cea3b635c19f46b6db627b6d41e (diff) | |
download | ydb-25a02bcefbd499da8e6be48f161e58354e63cc3c.tar.gz |
Handle clent timeout in session actor.
26 files changed, 357 insertions, 7 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index a3bdbca2a40..61239219170 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -157,6 +157,7 @@ struct TKikimrEvents : TEvents { ES_METADATA_MANAGER, ES_METADATA_SECRET, ES_TEST_LOAD, + ES_GRPC_CANCELATION, }; }; diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt index bfbb8681c97..e42e68aba8d 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin.txt @@ -7,6 +7,7 @@ add_subdirectory(base) +add_subdirectory(cancelation) add_subdirectory(counters) add_subdirectory(local_rpc) add_subdirectory(ut) @@ -28,6 +29,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-formats core-grpc_services-counters core-grpc_services-local_rpc + core-grpc_services-cancelation ydb-core-health_check ydb-core-io_formats core-kesus-tablet diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index 882edf3557e..6d1930bcca9 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -7,6 +7,7 @@ add_subdirectory(base) +add_subdirectory(cancelation) add_subdirectory(counters) add_subdirectory(local_rpc) add_subdirectory(ut) @@ -29,6 +30,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-formats core-grpc_services-counters core-grpc_services-local_rpc + core-grpc_services-cancelation ydb-core-health_check ydb-core-io_formats core-kesus-tablet diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt index 882edf3557e..6d1930bcca9 100644 --- a/ydb/core/grpc_services/CMakeLists.linux.txt +++ b/ydb/core/grpc_services/CMakeLists.linux.txt @@ -7,6 +7,7 @@ add_subdirectory(base) +add_subdirectory(cancelation) add_subdirectory(counters) add_subdirectory(local_rpc) add_subdirectory(ut) @@ -29,6 +30,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-formats core-grpc_services-counters core-grpc_services-local_rpc + core-grpc_services-cancelation ydb-core-health_check ydb-core-io_formats core-kesus-tablet diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 722ed130213..197368c7a0a 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -386,6 +386,8 @@ public: virtual ~IRequestCtxMtSafe() = default; virtual const google::protobuf::Message* GetRequest() const = 0; virtual const TMaybe<TString> GetRequestType() const = 0; + // Implementation must be thread safe + virtual void SetClientLostAction(std::function<void()>&& cb) = 0; }; // Request context @@ -397,7 +399,6 @@ class IRequestCtx friend class TProtoResponseHelper; public: - virtual void SetClientLostAction(std::function<void()>&& cb) = 0; virtual google::protobuf::Message* GetRequestMut() = 0; virtual google::protobuf::Arena* GetArena() = 0; diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.darwin.txt b/ydb/core/grpc_services/cancelation/CMakeLists.darwin.txt new file mode 100644 index 00000000000..c9cab09c433 --- /dev/null +++ b/ydb/core/grpc_services/cancelation/CMakeLists.darwin.txt @@ -0,0 +1,21 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(protos) + +add_library(core-grpc_services-cancelation) +target_link_libraries(core-grpc_services-cancelation PUBLIC + contrib-libs-cxxsupp + yutil + grpc_services-cancelation-protos + core-grpc_services-base + ydb-core-protos +) +target_sources(core-grpc_services-cancelation PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/cancelation.cpp +) diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/cancelation/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..d5cdddddd0f --- /dev/null +++ b/ydb/core/grpc_services/cancelation/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(protos) + +add_library(core-grpc_services-cancelation) +target_link_libraries(core-grpc_services-cancelation PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + grpc_services-cancelation-protos + core-grpc_services-base + ydb-core-protos +) +target_sources(core-grpc_services-cancelation PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/cancelation.cpp +) diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.linux.txt b/ydb/core/grpc_services/cancelation/CMakeLists.linux.txt new file mode 100644 index 00000000000..d5cdddddd0f --- /dev/null +++ b/ydb/core/grpc_services/cancelation/CMakeLists.linux.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(protos) + +add_library(core-grpc_services-cancelation) +target_link_libraries(core-grpc_services-cancelation PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + grpc_services-cancelation-protos + core-grpc_services-base + ydb-core-protos +) +target_sources(core-grpc_services-cancelation PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/cancelation.cpp +) diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.txt b/ydb/core/grpc_services/cancelation/CMakeLists.txt new file mode 100644 index 00000000000..3e0811fb22e --- /dev/null +++ b/ydb/core/grpc_services/cancelation/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/grpc_services/cancelation/cancelation.cpp b/ydb/core/grpc_services/cancelation/cancelation.cpp new file mode 100644 index 00000000000..4b12dc9b53f --- /dev/null +++ b/ydb/core/grpc_services/cancelation/cancelation.cpp @@ -0,0 +1,29 @@ +#include "cancelation.h" +#include "cancelation_event.h" + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/cancelation/cancelation_event.h> + +#include <library/cpp/actors/core/actorsystem.h> + +namespace NKikimr { +namespace NGRpcService { + +void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* requestCtx, + NActors::TActorSystem* as) +{ + auto subscriber = ActorIdFromProto(ev->Record.GetSubscriber()); + auto tag = ev->Record.GetWakeupTag(); + requestCtx->SetClientLostAction([subscriber, tag, as]() { + as->Send(subscriber, new TEvents::TEvWakeup(tag)); + }); +} + +void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber, + ui64 wakeupTag, NActors::TActorSystem* as) +{ + as->Send(service, new TEvSubscribeGrpcCancel(subscriber, wakeupTag)); +} + +} +} diff --git a/ydb/core/grpc_services/cancelation/cancelation.h b/ydb/core/grpc_services/cancelation/cancelation.h new file mode 100644 index 00000000000..ffe13eaaf40 --- /dev/null +++ b/ydb/core/grpc_services/cancelation/cancelation.h @@ -0,0 +1,18 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr { +namespace NGRpcService { + +struct TEvSubscribeGrpcCancel; +class IRequestCtxMtSafe; + +void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* requestCtx, + NActors::TActorSystem* as); + +void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber, + ui64 wakeupTag, NActors::TActorSystem* as); + +} +} diff --git a/ydb/core/grpc_services/cancelation/cancelation_event.h b/ydb/core/grpc_services/cancelation/cancelation_event.h new file mode 100644 index 00000000000..6fa88008610 --- /dev/null +++ b/ydb/core/grpc_services/cancelation/cancelation_event.h @@ -0,0 +1,23 @@ +#pragma once + +#include <ydb/core/base/events.h> +#include <ydb/core/grpc_services/cancelation/protos/event.pb.h> + +namespace NKikimr { +namespace NGRpcService { + +enum EServiceId { + EvSubscribeGrpcCancel = EventSpaceBegin(TKikimrEvents::ES_GRPC_CANCELATION), +}; + +struct TEvSubscribeGrpcCancel : public TEventPB<TEvSubscribeGrpcCancel, NKikimrGRpcService::TEvSubscribeGrpcCancel, EvSubscribeGrpcCancel> { + TEvSubscribeGrpcCancel() = default; + TEvSubscribeGrpcCancel(const NActors::TActorId& subscriber, ui64 wakeupTag) { + ActorIdToProto(subscriber, Record.MutableSubscriber()); + Record.SetWakeupTag(wakeupTag); + } +}; + +} +} + diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.darwin.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.darwin.txt new file mode 100644 index 00000000000..50a7ff3e592 --- /dev/null +++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.darwin.txt @@ -0,0 +1,32 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(grpc_services-cancelation-protos) +target_link_libraries(grpc_services-cancelation-protos PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-protos + contrib-libs-protobuf +) +target_proto_messages(grpc_services-cancelation-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/protos/event.proto +) +target_proto_addincls(grpc_services-cancelation-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(grpc_services-cancelation-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..214d8ee77fb --- /dev/null +++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux-aarch64.txt @@ -0,0 +1,33 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(grpc_services-cancelation-protos) +target_link_libraries(grpc_services-cancelation-protos PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-protos + contrib-libs-protobuf +) +target_proto_messages(grpc_services-cancelation-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/protos/event.proto +) +target_proto_addincls(grpc_services-cancelation-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(grpc_services-cancelation-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux.txt new file mode 100644 index 00000000000..214d8ee77fb --- /dev/null +++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux.txt @@ -0,0 +1,33 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(grpc_services-cancelation-protos) +target_link_libraries(grpc_services-cancelation-protos PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-protos + contrib-libs-protobuf +) +target_proto_messages(grpc_services-cancelation-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/protos/event.proto +) +target_proto_addincls(grpc_services-cancelation-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(grpc_services-cancelation-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.txt new file mode 100644 index 00000000000..3e0811fb22e --- /dev/null +++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/grpc_services/cancelation/protos/event.proto b/ydb/core/grpc_services/cancelation/protos/event.proto new file mode 100644 index 00000000000..00eae69fd94 --- /dev/null +++ b/ydb/core/grpc_services/cancelation/protos/event.proto @@ -0,0 +1,9 @@ +import "library/cpp/actors/protos/actors.proto"; + +package NKikimrGRpcService; + +message TEvSubscribeGrpcCancel { + optional NActorsProto.TActorId Subscriber = 1; + optional uint64 WakeupTag = 2; +}; + diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h index 2b871d3db5d..aa3130434f1 100644 --- a/ydb/core/grpc_services/rpc_deferrable.h +++ b/ydb/core/grpc_services/rpc_deferrable.h @@ -2,6 +2,8 @@ #include "defs.h" #include "grpc_request_proxy.h" +#include "cancelation/cancelation.h" +#include "cancelation/cancelation_event.h" #include "rpc_common.h" #include <ydb/core/tx/tx_proxy/proxy.h> @@ -166,6 +168,7 @@ protected: switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvWakeup, HandleWakeup); HFunc(TRpcServices::TEvForgetOperation, HandleForget); + hFunc(TEvSubscribeGrpcCancel, HandleSubscribeiGrpcCancel); default: { NYql::TIssues issues; issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, @@ -259,6 +262,11 @@ protected: Y_UNUSED(ev); static_cast<TDerived*>(this)->OnForgetOperation(ctx); } +private: + void HandleSubscribeiGrpcCancel(TEvSubscribeGrpcCancel::TPtr& ev) { + auto as = TActivationContext::ActorSystem(); + PassSubscription(ev->Get(), Request_.get(), as); + } }; } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index f97f36fcd04..7d04f7cec55 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -179,7 +179,7 @@ public: } } - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(Request_, SerializeQueryRequest); + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(Request_, SerializeQueryRequest, SelfId()); ev->PrepareRemote(); ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt index 980d4ba9385..b1ffca64c49 100644 --- a/ydb/core/kqp/common/CMakeLists.darwin.txt +++ b/ydb/core/kqp/common/CMakeLists.darwin.txt @@ -24,6 +24,7 @@ target_link_libraries(core-kqp-common PUBLIC yql-dq-actors yql-dq-common parser-pg_wrapper-interface + core-grpc_services-cancelation library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt index 00188567192..6f0724c4474 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt @@ -25,6 +25,7 @@ target_link_libraries(core-kqp-common PUBLIC yql-dq-actors yql-dq-common parser-pg_wrapper-interface + core-grpc_services-cancelation library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt index 00188567192..6f0724c4474 100644 --- a/ydb/core/kqp/common/CMakeLists.linux.txt +++ b/ydb/core/kqp/common/CMakeLists.linux.txt @@ -25,6 +25,7 @@ target_link_libraries(core-kqp-common PUBLIC yql-dq-actors yql-dq-common parser-pg_wrapper-interface + core-grpc_services-cancelation library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index ad993bd4c78..7e7fe78adc9 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -7,6 +7,7 @@ #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/cancelation/cancelation.h> #include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -254,10 +255,13 @@ struct TEvKqp { struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> { public: using TSerializerCb = void (*)(std::shared_ptr<NGRpcService::IRequestCtxMtSafe>&, NKikimrKqp::TEvQueryRequest*) noexcept; - TEvQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, TSerializerCb cb) + TEvQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, TSerializerCb cb, TActorId actorId) : RequestCtx(ctx) , SerializerCb(cb) - { } + { + ActorIdToProto(actorId, Record.MutableCancelationActor()); + } + TEvQueryRequest() = default; @@ -287,6 +291,17 @@ struct TEvKqp { return req; } + void SetClientLostAction(TActorId actorId, ui64 wakeupTag, NActors::TActorSystem* as) { + if (RequestCtx) { + RequestCtx->SetClientLostAction([actorId, wakeupTag, as]() { + as->Send(actorId, new TEvents::TEvWakeup(wakeupTag)); + }); + } else if (Record.HasCancelationActor()) { + auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor()); + NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, wakeupTag, as); + } + } + void PrepareRemote() const { if (RequestCtx) { Y_VERIFY(SerializerCb); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 74157a445e6..cf715513f11 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -207,6 +207,10 @@ private: TKqpSessionActor* This; }; +enum EWakeupTag { + ClientLost, +}; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_SESSION_ACTOR; @@ -385,6 +389,16 @@ public: } } + void HandleWakeup(TEvents::TEvWakeup::TPtr &ev) { + switch ((EWakeupTag) ev->Get()->Tag) { + case EWakeupTag::ClientLost: + Cleanup(); + break; + default: + YQL_ENSURE(false, "Unexpected Wakeup tag for HandleWakeup: " << (ui64) ev->Get()->Tag); + } + } + void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) { ui64 proxyRequestId = ev->Cookie; auto& event = ev->Get()->Record; @@ -467,6 +481,10 @@ public: QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId()); QueryState->KeepSession = Settings.LongSession || queryRequest.GetKeepSession(); + auto selfId = SelfId(); + auto as = TActivationContext::ActorSystem(); + ev->Get()->SetClientLostAction(selfId, EWakeupTag::ClientLost, as); + switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: case NKikimrKqp::QUERY_ACTION_PREPARE: @@ -2073,6 +2091,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); + hFunc(TEvents::TEvWakeup, HandleWakeup); default: UnexpectedEvent("CompileState", ev); } @@ -2101,6 +2120,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); + hFunc(TEvents::TEvWakeup, HandleWakeup); // always come from WorkerActor hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); @@ -2128,6 +2148,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); + hFunc(TEvents::TEvWakeup, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 45951e4f6a4..b971cc1db6b 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -240,9 +240,18 @@ Y_UNIT_TEST_SUITE(KqpQuery) { NDataShard::gSkipRepliesFailPoint.Disable(); - // Check session is ready or busy - result = session.ExecuteDataQuery(query, txControl).ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString()); + const TInstant start = TInstant::Now(); + // Check session is ready or busy, but eventualy must be ready + while (true) { + result = session.ExecuteDataQuery(query, txControl).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString()); + if (result.GetStatus() == EStatus::SUCCESS) { + break; + } + UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after timeout status"); + // Do not fire too much CPU + Sleep(TDuration::MilliSeconds(10)); + } } Y_UNIT_TEST(QueryTimeoutImmediate) { @@ -307,6 +316,19 @@ Y_UNIT_TEST_SUITE(KqpQuery) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED); NDataShard::gSkipRepliesFailPoint.Disable(); + + const TInstant start = TInstant::Now(); + while (true) { + result = session.ExecuteDataQuery(query, txControl).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString()); + if (result.GetStatus() == EStatus::SUCCESS) { + break; + } + + UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after client lost"); + // Do not fire too much CPU + Sleep(TDuration::MilliSeconds(10)); + } } TString line; diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 4e595940379..e2724f899d8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -158,6 +158,7 @@ message TEvQueryRequest { repeated Ydb.Issue.IssueMessage QueryIssues = 7; optional Ydb.StatusIds.StatusCode YdbStatus = 8; + optional NActorsProto.TActorId CancelationActor = 9; } message TMkqlProfile { |