summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <[email protected]>2023-01-19 14:16:20 +0300
committerdcherednik <[email protected]>2023-01-19 14:16:20 +0300
commitdff1e2c3d473c7b3e6ec749ae02f3a825c029488 (patch)
treea1bb2b3520c464b4b0c2bb53aa9431ac9991e571
parent29ff5da49e72743e9b37dbbc20ebf791999301f8 (diff)
Do not use TEvWakeup for remote notification.
-rw-r--r--ydb/core/grpc_services/cancelation/cancelation.cpp9
-rw-r--r--ydb/core/grpc_services/cancelation/cancelation.h2
-rw-r--r--ydb/core/grpc_services/cancelation/cancelation_event.h8
-rw-r--r--ydb/core/grpc_services/cancelation/protos/event.proto3
-rw-r--r--ydb/core/kqp/common/kqp.h9
-rw-r--r--ydb/core/kqp/gateway/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/gateway/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp32
-rw-r--r--ydb/core/load_test/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/load_test/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/load_test/CMakeLists.linux.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux.txt1
-rw-r--r--ydb/core/ymq/base/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/ymq/base/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/ymq/base/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp1
20 files changed, 41 insertions, 36 deletions
diff --git a/ydb/core/grpc_services/cancelation/cancelation.cpp b/ydb/core/grpc_services/cancelation/cancelation.cpp
index 4b12dc9b53f..37136078594 100644
--- a/ydb/core/grpc_services/cancelation/cancelation.cpp
+++ b/ydb/core/grpc_services/cancelation/cancelation.cpp
@@ -13,16 +13,15 @@ void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* reque
NActors::TActorSystem* as)
{
auto subscriber = ActorIdFromProto(ev->Record.GetSubscriber());
- auto tag = ev->Record.GetWakeupTag();
- requestCtx->SetClientLostAction([subscriber, tag, as]() {
- as->Send(subscriber, new TEvents::TEvWakeup(tag));
+ requestCtx->SetClientLostAction([subscriber, as]() {
+ as->Send(subscriber, new TEvClientLost);
});
}
void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber,
- ui64 wakeupTag, NActors::TActorSystem* as)
+ NActors::TActorSystem* as)
{
- as->Send(service, new TEvSubscribeGrpcCancel(subscriber, wakeupTag));
+ as->Send(service, new TEvSubscribeGrpcCancel(subscriber));
}
}
diff --git a/ydb/core/grpc_services/cancelation/cancelation.h b/ydb/core/grpc_services/cancelation/cancelation.h
index ffe13eaaf40..fe4fe253ef6 100644
--- a/ydb/core/grpc_services/cancelation/cancelation.h
+++ b/ydb/core/grpc_services/cancelation/cancelation.h
@@ -12,7 +12,7 @@ void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* reque
NActors::TActorSystem* as);
void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber,
- ui64 wakeupTag, NActors::TActorSystem* as);
+ NActors::TActorSystem* as);
}
}
diff --git a/ydb/core/grpc_services/cancelation/cancelation_event.h b/ydb/core/grpc_services/cancelation/cancelation_event.h
index 6fa88008610..c814373f617 100644
--- a/ydb/core/grpc_services/cancelation/cancelation_event.h
+++ b/ydb/core/grpc_services/cancelation/cancelation_event.h
@@ -8,16 +8,20 @@ namespace NGRpcService {
enum EServiceId {
EvSubscribeGrpcCancel = EventSpaceBegin(TKikimrEvents::ES_GRPC_CANCELATION),
+ EvClientLost
};
struct TEvSubscribeGrpcCancel : public TEventPB<TEvSubscribeGrpcCancel, NKikimrGRpcService::TEvSubscribeGrpcCancel, EvSubscribeGrpcCancel> {
TEvSubscribeGrpcCancel() = default;
- TEvSubscribeGrpcCancel(const NActors::TActorId& subscriber, ui64 wakeupTag) {
+ TEvSubscribeGrpcCancel(const NActors::TActorId& subscriber) {
ActorIdToProto(subscriber, Record.MutableSubscriber());
- Record.SetWakeupTag(wakeupTag);
}
};
+struct TEvClientLost : public TEventPB<TEvClientLost, NKikimrGRpcService::TEvClientLost, EvClientLost> {
+ TEvClientLost() = default;
+};
+
}
}
diff --git a/ydb/core/grpc_services/cancelation/protos/event.proto b/ydb/core/grpc_services/cancelation/protos/event.proto
index 00eae69fd94..41537229194 100644
--- a/ydb/core/grpc_services/cancelation/protos/event.proto
+++ b/ydb/core/grpc_services/cancelation/protos/event.proto
@@ -4,6 +4,7 @@ package NKikimrGRpcService;
message TEvSubscribeGrpcCancel {
optional NActorsProto.TActorId Subscriber = 1;
- optional uint64 WakeupTag = 2;
};
+message TEvClientLost {
+};
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 04247a45f29..e4161d225a6 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -9,6 +9,7 @@
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/cancelation/cancelation.h>
+#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -310,14 +311,14 @@ struct TEvKqp {
return req;
}
- void SetClientLostAction(TActorId actorId, ui64 wakeupTag, NActors::TActorSystem* as) {
+ void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) {
if (RequestCtx) {
- RequestCtx->SetClientLostAction([actorId, wakeupTag, as]() {
- as->Send(actorId, new TEvents::TEvWakeup(wakeupTag));
+ RequestCtx->SetClientLostAction([actorId, as]() {
+ as->Send(actorId, new NGRpcService::TEvClientLost());
});
} else if (Record.HasCancelationActor()) {
auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor());
- NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, wakeupTag, as);
+ NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, as);
}
}
diff --git a/ydb/core/kqp/gateway/CMakeLists.darwin.txt b/ydb/core/kqp/gateway/CMakeLists.darwin.txt
index a2b7ff3564d..ca44b59ecb7 100644
--- a/ydb/core/kqp/gateway/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/gateway/CMakeLists.darwin.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-gateway PUBLIC
cpp-actors-core
ydb-core-actorlib_impl
ydb-core-base
+ core-kqp-common
providers-result-expr_nodes
)
target_sources(core-kqp-gateway PRIVATE
diff --git a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt
index b68d9fc9930..a70cfb046b1 100644
--- a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-kqp-gateway PUBLIC
cpp-actors-core
ydb-core-actorlib_impl
ydb-core-base
+ core-kqp-common
providers-result-expr_nodes
)
target_sources(core-kqp-gateway PRIVATE
diff --git a/ydb/core/kqp/gateway/CMakeLists.linux.txt b/ydb/core/kqp/gateway/CMakeLists.linux.txt
index b68d9fc9930..a70cfb046b1 100644
--- a/ydb/core/kqp/gateway/CMakeLists.linux.txt
+++ b/ydb/core/kqp/gateway/CMakeLists.linux.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-kqp-gateway PUBLIC
cpp-actors-core
ydb-core-actorlib_impl
ydb-core-base
+ core-kqp-common
providers-result-expr_nodes
)
target_sources(core-kqp-gateway PRIVATE
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index c8ba81c82ae..51eb2197573 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -9,7 +9,6 @@
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/dq/common/dq_value.h>
#include <ydb/core/kqp/topics/kqp_topics.h>
-#include <ydb/core/kqp/common/kqp_prepared_query.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 18189c042c0..3ac3dffa5d3 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -325,10 +325,6 @@ private:
TKqpSessionActor* This;
};
-enum EWakeupTag {
- ClientLost,
-};
-
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_SESSION_ACTOR;
@@ -514,22 +510,16 @@ public:
}
}
- void HandleWakeup(TEvents::TEvWakeup::TPtr &ev) {
- switch ((EWakeupTag) ev->Get()->Tag) {
- case EWakeupTag::ClientLost:
- LOG_D("Got TEvWakeup ClientLost event, send AbortExecution to executor: "
- << ExecuterId);
+ void HandleClientLost(NGRpcService::TEvClientLost::TPtr&) {
+ LOG_D("Got ClientLost event, send AbortExecution to executor: "
+ << ExecuterId);
- if (ExecuterId) {
- auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here
+ if (ExecuterId) {
+ auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here
- Send(ExecuterId, abortEv.Release());
- }
- Cleanup();
- break;
- default:
- YQL_ENSURE(false, "Unexpected Wakeup tag for HandleWakeup: " << (ui64) ev->Get()->Tag);
+ Send(ExecuterId, abortEv.Release());
}
+ Cleanup();
}
void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) {
@@ -607,7 +597,7 @@ public:
auto selfId = SelfId();
auto as = TActivationContext::ActorSystem();
- ev->Get()->SetClientLostAction(selfId, EWakeupTag::ClientLost, as);
+ ev->Get()->SetClientLostAction(selfId, as);
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
@@ -2158,7 +2148,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse);
- hFunc(TEvents::TEvWakeup, HandleWakeup);
+ hFunc(NGRpcService::TEvClientLost, HandleClientLost);
default:
UnexpectedEvent("CompileState", ev);
}
@@ -2185,7 +2175,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvents::TEvWakeup, HandleWakeup);
+ hFunc(NGRpcService::TEvClientLost, HandleClientLost);
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
@@ -2211,7 +2201,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvents::TEvWakeup, HandleNoop);
+ hFunc(NGRpcService::TEvClientLost, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
diff --git a/ydb/core/load_test/CMakeLists.darwin.txt b/ydb/core/load_test/CMakeLists.darwin.txt
index 2347724689f..7022fd47dbb 100644
--- a/ydb/core/load_test/CMakeLists.darwin.txt
+++ b/ydb/core/load_test/CMakeLists.darwin.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-core-load_test PUBLIC
core-blobstorage-pdisk
ydb-core-control
ydb-core-keyvalue
+ core-kqp-common
ydb-library-workload
public-lib-base
public-lib-operation_id
diff --git a/ydb/core/load_test/CMakeLists.linux-aarch64.txt b/ydb/core/load_test/CMakeLists.linux-aarch64.txt
index 0968220b4d5..a7ece616e2b 100644
--- a/ydb/core/load_test/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/load_test/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-load_test PUBLIC
core-blobstorage-pdisk
ydb-core-control
ydb-core-keyvalue
+ core-kqp-common
ydb-library-workload
public-lib-base
public-lib-operation_id
diff --git a/ydb/core/load_test/CMakeLists.linux.txt b/ydb/core/load_test/CMakeLists.linux.txt
index 0968220b4d5..a7ece616e2b 100644
--- a/ydb/core/load_test/CMakeLists.linux.txt
+++ b/ydb/core/load_test/CMakeLists.linux.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-load_test PUBLIC
core-blobstorage-pdisk
ydb-core-control
ydb-core-keyvalue
+ core-kqp-common
ydb-library-workload
public-lib-base
public-lib-operation_id
diff --git a/ydb/core/persqueue/CMakeLists.darwin.txt b/ydb/core/persqueue/CMakeLists.darwin.txt
index 7b9926052f1..148966b32e6 100644
--- a/ydb/core/persqueue/CMakeLists.darwin.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin.txt
@@ -24,6 +24,7 @@ target_link_libraries(ydb-core-persqueue PUBLIC
ydb-core-base
core-engine-minikql
ydb-core-keyvalue
+ core-kqp-common
ydb-core-metering
core-persqueue-codecs
core-persqueue-config
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index fce2e6ac157..c58694fa222 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-core-persqueue PUBLIC
ydb-core-base
core-engine-minikql
ydb-core-keyvalue
+ core-kqp-common
ydb-core-metering
core-persqueue-codecs
core-persqueue-config
diff --git a/ydb/core/persqueue/CMakeLists.linux.txt b/ydb/core/persqueue/CMakeLists.linux.txt
index fce2e6ac157..c58694fa222 100644
--- a/ydb/core/persqueue/CMakeLists.linux.txt
+++ b/ydb/core/persqueue/CMakeLists.linux.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-core-persqueue PUBLIC
ydb-core-base
core-engine-minikql
ydb-core-keyvalue
+ core-kqp-common
ydb-core-metering
core-persqueue-codecs
core-persqueue-config
diff --git a/ydb/core/ymq/base/CMakeLists.darwin.txt b/ydb/core/ymq/base/CMakeLists.darwin.txt
index c3bde7ce873..f8d33c5fae7 100644
--- a/ydb/core/ymq/base/CMakeLists.darwin.txt
+++ b/ydb/core/ymq/base/CMakeLists.darwin.txt
@@ -24,6 +24,7 @@ target_link_libraries(core-ymq-base PUBLIC
ydb-core-base
ydb-core-protos
core-ymq-proto
+ core-kqp-common
ydb-library-aclib
library-http_proxy-authorization
library-http_proxy-error
diff --git a/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt b/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt
index 2fa2b6fdcfd..9cab4b5c305 100644
--- a/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-ymq-base PUBLIC
ydb-core-base
ydb-core-protos
core-ymq-proto
+ core-kqp-common
ydb-library-aclib
library-http_proxy-authorization
library-http_proxy-error
diff --git a/ydb/core/ymq/base/CMakeLists.linux.txt b/ydb/core/ymq/base/CMakeLists.linux.txt
index 2fa2b6fdcfd..9cab4b5c305 100644
--- a/ydb/core/ymq/base/CMakeLists.linux.txt
+++ b/ydb/core/ymq/base/CMakeLists.linux.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-ymq-base PUBLIC
ydb-core-base
ydb-core-protos
core-ymq-proto
+ core-kqp-common
ydb-library-aclib
library-http_proxy-authorization
library-http_proxy-error
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index 02178732b16..02b10d5affe 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -17,7 +17,6 @@
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
-#include <ydb/core/kqp/common/kqp.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/event_pb.h>