aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2022-12-19 18:21:27 +0300
committerdcherednik <dcherednik@ydb.tech>2022-12-19 18:21:27 +0300
commit25a02bcefbd499da8e6be48f161e58354e63cc3c (patch)
treeebe4fc3977764d273187dcaea4575c496869a559
parentbc9bfa700f812cea3b635c19f46b6db627b6d41e (diff)
downloadydb-25a02bcefbd499da8e6be48f161e58354e63cc3c.tar.gz
Handle clent timeout in session actor.
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux.txt2
-rw-r--r--ydb/core/grpc_services/base/base.h3
-rw-r--r--ydb/core/grpc_services/cancelation/CMakeLists.darwin.txt21
-rw-r--r--ydb/core/grpc_services/cancelation/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/grpc_services/cancelation/CMakeLists.linux.txt22
-rw-r--r--ydb/core/grpc_services/cancelation/CMakeLists.txt15
-rw-r--r--ydb/core/grpc_services/cancelation/cancelation.cpp29
-rw-r--r--ydb/core/grpc_services/cancelation/cancelation.h18
-rw-r--r--ydb/core/grpc_services/cancelation/cancelation_event.h23
-rw-r--r--ydb/core/grpc_services/cancelation/protos/CMakeLists.darwin.txt32
-rw-r--r--ydb/core/grpc_services/cancelation/protos/CMakeLists.linux-aarch64.txt33
-rw-r--r--ydb/core/grpc_services/cancelation/protos/CMakeLists.linux.txt33
-rw-r--r--ydb/core/grpc_services/cancelation/protos/CMakeLists.txt15
-rw-r--r--ydb/core/grpc_services/cancelation/protos/event.proto9
-rw-r--r--ydb/core/grpc_services/rpc_deferrable.h8
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp2
-rw-r--r--ydb/core/kqp/common/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/common/kqp.h19
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp21
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp28
-rw-r--r--ydb/core/protos/kqp.proto1
26 files changed, 357 insertions, 7 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index a3bdbca2a40..61239219170 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -157,6 +157,7 @@ struct TKikimrEvents : TEvents {
ES_METADATA_MANAGER,
ES_METADATA_SECRET,
ES_TEST_LOAD,
+ ES_GRPC_CANCELATION,
};
};
diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt
index bfbb8681c97..e42e68aba8d 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin.txt
@@ -7,6 +7,7 @@
add_subdirectory(base)
+add_subdirectory(cancelation)
add_subdirectory(counters)
add_subdirectory(local_rpc)
add_subdirectory(ut)
@@ -28,6 +29,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-formats
core-grpc_services-counters
core-grpc_services-local_rpc
+ core-grpc_services-cancelation
ydb-core-health_check
ydb-core-io_formats
core-kesus-tablet
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index 882edf3557e..6d1930bcca9 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -7,6 +7,7 @@
add_subdirectory(base)
+add_subdirectory(cancelation)
add_subdirectory(counters)
add_subdirectory(local_rpc)
add_subdirectory(ut)
@@ -29,6 +30,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-formats
core-grpc_services-counters
core-grpc_services-local_rpc
+ core-grpc_services-cancelation
ydb-core-health_check
ydb-core-io_formats
core-kesus-tablet
diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt
index 882edf3557e..6d1930bcca9 100644
--- a/ydb/core/grpc_services/CMakeLists.linux.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux.txt
@@ -7,6 +7,7 @@
add_subdirectory(base)
+add_subdirectory(cancelation)
add_subdirectory(counters)
add_subdirectory(local_rpc)
add_subdirectory(ut)
@@ -29,6 +30,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-formats
core-grpc_services-counters
core-grpc_services-local_rpc
+ core-grpc_services-cancelation
ydb-core-health_check
ydb-core-io_formats
core-kesus-tablet
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 722ed130213..197368c7a0a 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -386,6 +386,8 @@ public:
virtual ~IRequestCtxMtSafe() = default;
virtual const google::protobuf::Message* GetRequest() const = 0;
virtual const TMaybe<TString> GetRequestType() const = 0;
+ // Implementation must be thread safe
+ virtual void SetClientLostAction(std::function<void()>&& cb) = 0;
};
// Request context
@@ -397,7 +399,6 @@ class IRequestCtx
friend class TProtoResponseHelper;
public:
- virtual void SetClientLostAction(std::function<void()>&& cb) = 0;
virtual google::protobuf::Message* GetRequestMut() = 0;
virtual google::protobuf::Arena* GetArena() = 0;
diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.darwin.txt b/ydb/core/grpc_services/cancelation/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..c9cab09c433
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/CMakeLists.darwin.txt
@@ -0,0 +1,21 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(protos)
+
+add_library(core-grpc_services-cancelation)
+target_link_libraries(core-grpc_services-cancelation PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ grpc_services-cancelation-protos
+ core-grpc_services-base
+ ydb-core-protos
+)
+target_sources(core-grpc_services-cancelation PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/cancelation.cpp
+)
diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/cancelation/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..d5cdddddd0f
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(protos)
+
+add_library(core-grpc_services-cancelation)
+target_link_libraries(core-grpc_services-cancelation PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ grpc_services-cancelation-protos
+ core-grpc_services-base
+ ydb-core-protos
+)
+target_sources(core-grpc_services-cancelation PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/cancelation.cpp
+)
diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.linux.txt b/ydb/core/grpc_services/cancelation/CMakeLists.linux.txt
new file mode 100644
index 00000000000..d5cdddddd0f
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/CMakeLists.linux.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(protos)
+
+add_library(core-grpc_services-cancelation)
+target_link_libraries(core-grpc_services-cancelation PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ grpc_services-cancelation-protos
+ core-grpc_services-base
+ ydb-core-protos
+)
+target_sources(core-grpc_services-cancelation PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/cancelation.cpp
+)
diff --git a/ydb/core/grpc_services/cancelation/CMakeLists.txt b/ydb/core/grpc_services/cancelation/CMakeLists.txt
new file mode 100644
index 00000000000..3e0811fb22e
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/grpc_services/cancelation/cancelation.cpp b/ydb/core/grpc_services/cancelation/cancelation.cpp
new file mode 100644
index 00000000000..4b12dc9b53f
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/cancelation.cpp
@@ -0,0 +1,29 @@
+#include "cancelation.h"
+#include "cancelation_event.h"
+
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
+
+#include <library/cpp/actors/core/actorsystem.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* requestCtx,
+ NActors::TActorSystem* as)
+{
+ auto subscriber = ActorIdFromProto(ev->Record.GetSubscriber());
+ auto tag = ev->Record.GetWakeupTag();
+ requestCtx->SetClientLostAction([subscriber, tag, as]() {
+ as->Send(subscriber, new TEvents::TEvWakeup(tag));
+ });
+}
+
+void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber,
+ ui64 wakeupTag, NActors::TActorSystem* as)
+{
+ as->Send(service, new TEvSubscribeGrpcCancel(subscriber, wakeupTag));
+}
+
+}
+}
diff --git a/ydb/core/grpc_services/cancelation/cancelation.h b/ydb/core/grpc_services/cancelation/cancelation.h
new file mode 100644
index 00000000000..ffe13eaaf40
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/cancelation.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+struct TEvSubscribeGrpcCancel;
+class IRequestCtxMtSafe;
+
+void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* requestCtx,
+ NActors::TActorSystem* as);
+
+void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber,
+ ui64 wakeupTag, NActors::TActorSystem* as);
+
+}
+}
diff --git a/ydb/core/grpc_services/cancelation/cancelation_event.h b/ydb/core/grpc_services/cancelation/cancelation_event.h
new file mode 100644
index 00000000000..6fa88008610
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/cancelation_event.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <ydb/core/base/events.h>
+#include <ydb/core/grpc_services/cancelation/protos/event.pb.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+enum EServiceId {
+ EvSubscribeGrpcCancel = EventSpaceBegin(TKikimrEvents::ES_GRPC_CANCELATION),
+};
+
+struct TEvSubscribeGrpcCancel : public TEventPB<TEvSubscribeGrpcCancel, NKikimrGRpcService::TEvSubscribeGrpcCancel, EvSubscribeGrpcCancel> {
+ TEvSubscribeGrpcCancel() = default;
+ TEvSubscribeGrpcCancel(const NActors::TActorId& subscriber, ui64 wakeupTag) {
+ ActorIdToProto(subscriber, Record.MutableSubscriber());
+ Record.SetWakeupTag(wakeupTag);
+ }
+};
+
+}
+}
+
diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.darwin.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..50a7ff3e592
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.darwin.txt
@@ -0,0 +1,32 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(grpc_services-cancelation-protos)
+target_link_libraries(grpc_services-cancelation-protos PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-protos
+ contrib-libs-protobuf
+)
+target_proto_messages(grpc_services-cancelation-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/protos/event.proto
+)
+target_proto_addincls(grpc_services-cancelation-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(grpc_services-cancelation-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..214d8ee77fb
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,33 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(grpc_services-cancelation-protos)
+target_link_libraries(grpc_services-cancelation-protos PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-protos
+ contrib-libs-protobuf
+)
+target_proto_messages(grpc_services-cancelation-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/protos/event.proto
+)
+target_proto_addincls(grpc_services-cancelation-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(grpc_services-cancelation-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux.txt
new file mode 100644
index 00000000000..214d8ee77fb
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.linux.txt
@@ -0,0 +1,33 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(grpc_services-cancelation-protos)
+target_link_libraries(grpc_services-cancelation-protos PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-protos
+ contrib-libs-protobuf
+)
+target_proto_messages(grpc_services-cancelation-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/cancelation/protos/event.proto
+)
+target_proto_addincls(grpc_services-cancelation-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(grpc_services-cancelation-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/grpc_services/cancelation/protos/CMakeLists.txt b/ydb/core/grpc_services/cancelation/protos/CMakeLists.txt
new file mode 100644
index 00000000000..3e0811fb22e
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/protos/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/grpc_services/cancelation/protos/event.proto b/ydb/core/grpc_services/cancelation/protos/event.proto
new file mode 100644
index 00000000000..00eae69fd94
--- /dev/null
+++ b/ydb/core/grpc_services/cancelation/protos/event.proto
@@ -0,0 +1,9 @@
+import "library/cpp/actors/protos/actors.proto";
+
+package NKikimrGRpcService;
+
+message TEvSubscribeGrpcCancel {
+ optional NActorsProto.TActorId Subscriber = 1;
+ optional uint64 WakeupTag = 2;
+};
+
diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h
index 2b871d3db5d..aa3130434f1 100644
--- a/ydb/core/grpc_services/rpc_deferrable.h
+++ b/ydb/core/grpc_services/rpc_deferrable.h
@@ -2,6 +2,8 @@
#include "defs.h"
#include "grpc_request_proxy.h"
+#include "cancelation/cancelation.h"
+#include "cancelation/cancelation_event.h"
#include "rpc_common.h"
#include <ydb/core/tx/tx_proxy/proxy.h>
@@ -166,6 +168,7 @@ protected:
switch (ev->GetTypeRewrite()) {
HFunc(TEvents::TEvWakeup, HandleWakeup);
HFunc(TRpcServices::TEvForgetOperation, HandleForget);
+ hFunc(TEvSubscribeGrpcCancel, HandleSubscribeiGrpcCancel);
default: {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
@@ -259,6 +262,11 @@ protected:
Y_UNUSED(ev);
static_cast<TDerived*>(this)->OnForgetOperation(ctx);
}
+private:
+ void HandleSubscribeiGrpcCancel(TEvSubscribeGrpcCancel::TPtr& ev) {
+ auto as = TActivationContext::ActorSystem();
+ PassSubscription(ev->Get(), Request_.get(), as);
+ }
};
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index f97f36fcd04..7d04f7cec55 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -179,7 +179,7 @@ public:
}
}
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(Request_, SerializeQueryRequest);
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(Request_, SerializeQueryRequest, SelfId());
ev->PrepareRemote();
ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt
index 980d4ba9385..b1ffca64c49 100644
--- a/ydb/core/kqp/common/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/common/CMakeLists.darwin.txt
@@ -24,6 +24,7 @@ target_link_libraries(core-kqp-common PUBLIC
yql-dq-actors
yql-dq-common
parser-pg_wrapper-interface
+ core-grpc_services-cancelation
library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
index 00188567192..6f0724c4474 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-kqp-common PUBLIC
yql-dq-actors
yql-dq-common
parser-pg_wrapper-interface
+ core-grpc_services-cancelation
library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt
index 00188567192..6f0724c4474 100644
--- a/ydb/core/kqp/common/CMakeLists.linux.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-kqp-common PUBLIC
yql-dq-actors
yql-dq-common
parser-pg_wrapper-interface
+ core-grpc_services-cancelation
library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index ad993bd4c78..7e7fe78adc9 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -7,6 +7,7 @@
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/cancelation/cancelation.h>
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -254,10 +255,13 @@ struct TEvKqp {
struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
public:
using TSerializerCb = void (*)(std::shared_ptr<NGRpcService::IRequestCtxMtSafe>&, NKikimrKqp::TEvQueryRequest*) noexcept;
- TEvQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, TSerializerCb cb)
+ TEvQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, TSerializerCb cb, TActorId actorId)
: RequestCtx(ctx)
, SerializerCb(cb)
- { }
+ {
+ ActorIdToProto(actorId, Record.MutableCancelationActor());
+ }
+
TEvQueryRequest() = default;
@@ -287,6 +291,17 @@ struct TEvKqp {
return req;
}
+ void SetClientLostAction(TActorId actorId, ui64 wakeupTag, NActors::TActorSystem* as) {
+ if (RequestCtx) {
+ RequestCtx->SetClientLostAction([actorId, wakeupTag, as]() {
+ as->Send(actorId, new TEvents::TEvWakeup(wakeupTag));
+ });
+ } else if (Record.HasCancelationActor()) {
+ auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor());
+ NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, wakeupTag, as);
+ }
+ }
+
void PrepareRemote() const {
if (RequestCtx) {
Y_VERIFY(SerializerCb);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 74157a445e6..cf715513f11 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -207,6 +207,10 @@ private:
TKqpSessionActor* This;
};
+enum EWakeupTag {
+ ClientLost,
+};
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_SESSION_ACTOR;
@@ -385,6 +389,16 @@ public:
}
}
+ void HandleWakeup(TEvents::TEvWakeup::TPtr &ev) {
+ switch ((EWakeupTag) ev->Get()->Tag) {
+ case EWakeupTag::ClientLost:
+ Cleanup();
+ break;
+ default:
+ YQL_ENSURE(false, "Unexpected Wakeup tag for HandleWakeup: " << (ui64) ev->Get()->Tag);
+ }
+ }
+
void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) {
ui64 proxyRequestId = ev->Cookie;
auto& event = ev->Get()->Record;
@@ -467,6 +481,10 @@ public:
QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId());
QueryState->KeepSession = Settings.LongSession || queryRequest.GetKeepSession();
+ auto selfId = SelfId();
+ auto as = TActivationContext::ActorSystem();
+ ev->Get()->SetClientLostAction(selfId, EWakeupTag::ClientLost, as);
+
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
case NKikimrKqp::QUERY_ACTION_PREPARE:
@@ -2073,6 +2091,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
+ hFunc(TEvents::TEvWakeup, HandleWakeup);
default:
UnexpectedEvent("CompileState", ev);
}
@@ -2101,6 +2120,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
+ hFunc(TEvents::TEvWakeup, HandleWakeup);
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
@@ -2128,6 +2148,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
+ hFunc(TEvents::TEvWakeup, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index 45951e4f6a4..b971cc1db6b 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -240,9 +240,18 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
NDataShard::gSkipRepliesFailPoint.Disable();
- // Check session is ready or busy
- result = session.ExecuteDataQuery(query, txControl).ExtractValueSync();
- UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString());
+ const TInstant start = TInstant::Now();
+ // Check session is ready or busy, but eventualy must be ready
+ while (true) {
+ result = session.ExecuteDataQuery(query, txControl).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString());
+ if (result.GetStatus() == EStatus::SUCCESS) {
+ break;
+ }
+ UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after timeout status");
+ // Do not fire too much CPU
+ Sleep(TDuration::MilliSeconds(10));
+ }
}
Y_UNIT_TEST(QueryTimeoutImmediate) {
@@ -307,6 +316,19 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED);
NDataShard::gSkipRepliesFailPoint.Disable();
+
+ const TInstant start = TInstant::Now();
+ while (true) {
+ result = session.ExecuteDataQuery(query, txControl).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString());
+ if (result.GetStatus() == EStatus::SUCCESS) {
+ break;
+ }
+
+ UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after client lost");
+ // Do not fire too much CPU
+ Sleep(TDuration::MilliSeconds(10));
+ }
}
TString line;
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 4e595940379..e2724f899d8 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -158,6 +158,7 @@ message TEvQueryRequest {
repeated Ydb.Issue.IssueMessage QueryIssues = 7;
optional Ydb.StatusIds.StatusCode YdbStatus = 8;
+ optional NActorsProto.TActorId CancelationActor = 9;
}
message TMkqlProfile {