diff options
author | dcherednik <[email protected]> | 2023-01-19 14:16:20 +0300 |
---|---|---|
committer | dcherednik <[email protected]> | 2023-01-19 14:16:20 +0300 |
commit | dff1e2c3d473c7b3e6ec749ae02f3a825c029488 (patch) | |
tree | a1bb2b3520c464b4b0c2bb53aa9431ac9991e571 | |
parent | 29ff5da49e72743e9b37dbbc20ebf791999301f8 (diff) |
Do not use TEvWakeup for remote notification.
20 files changed, 41 insertions, 36 deletions
diff --git a/ydb/core/grpc_services/cancelation/cancelation.cpp b/ydb/core/grpc_services/cancelation/cancelation.cpp index 4b12dc9b53f..37136078594 100644 --- a/ydb/core/grpc_services/cancelation/cancelation.cpp +++ b/ydb/core/grpc_services/cancelation/cancelation.cpp @@ -13,16 +13,15 @@ void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* reque NActors::TActorSystem* as) { auto subscriber = ActorIdFromProto(ev->Record.GetSubscriber()); - auto tag = ev->Record.GetWakeupTag(); - requestCtx->SetClientLostAction([subscriber, tag, as]() { - as->Send(subscriber, new TEvents::TEvWakeup(tag)); + requestCtx->SetClientLostAction([subscriber, as]() { + as->Send(subscriber, new TEvClientLost); }); } void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber, - ui64 wakeupTag, NActors::TActorSystem* as) + NActors::TActorSystem* as) { - as->Send(service, new TEvSubscribeGrpcCancel(subscriber, wakeupTag)); + as->Send(service, new TEvSubscribeGrpcCancel(subscriber)); } } diff --git a/ydb/core/grpc_services/cancelation/cancelation.h b/ydb/core/grpc_services/cancelation/cancelation.h index ffe13eaaf40..fe4fe253ef6 100644 --- a/ydb/core/grpc_services/cancelation/cancelation.h +++ b/ydb/core/grpc_services/cancelation/cancelation.h @@ -12,7 +12,7 @@ void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* reque NActors::TActorSystem* as); void SubscribeRemoteCancel(const NActors::TActorId& service, const NActors::TActorId& subscriber, - ui64 wakeupTag, NActors::TActorSystem* as); + NActors::TActorSystem* as); } } diff --git a/ydb/core/grpc_services/cancelation/cancelation_event.h b/ydb/core/grpc_services/cancelation/cancelation_event.h index 6fa88008610..c814373f617 100644 --- a/ydb/core/grpc_services/cancelation/cancelation_event.h +++ b/ydb/core/grpc_services/cancelation/cancelation_event.h @@ -8,16 +8,20 @@ namespace NGRpcService { enum EServiceId { EvSubscribeGrpcCancel = EventSpaceBegin(TKikimrEvents::ES_GRPC_CANCELATION), + EvClientLost }; struct TEvSubscribeGrpcCancel : public TEventPB<TEvSubscribeGrpcCancel, NKikimrGRpcService::TEvSubscribeGrpcCancel, EvSubscribeGrpcCancel> { TEvSubscribeGrpcCancel() = default; - TEvSubscribeGrpcCancel(const NActors::TActorId& subscriber, ui64 wakeupTag) { + TEvSubscribeGrpcCancel(const NActors::TActorId& subscriber) { ActorIdToProto(subscriber, Record.MutableSubscriber()); - Record.SetWakeupTag(wakeupTag); } }; +struct TEvClientLost : public TEventPB<TEvClientLost, NKikimrGRpcService::TEvClientLost, EvClientLost> { + TEvClientLost() = default; +}; + } } diff --git a/ydb/core/grpc_services/cancelation/protos/event.proto b/ydb/core/grpc_services/cancelation/protos/event.proto index 00eae69fd94..41537229194 100644 --- a/ydb/core/grpc_services/cancelation/protos/event.proto +++ b/ydb/core/grpc_services/cancelation/protos/event.proto @@ -4,6 +4,7 @@ package NKikimrGRpcService; message TEvSubscribeGrpcCancel { optional NActorsProto.TActorId Subscriber = 1; - optional uint64 WakeupTag = 2; }; +message TEvClientLost { +}; diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 04247a45f29..e4161d225a6 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -9,6 +9,7 @@ #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/cancelation/cancelation.h> +#include <ydb/core/grpc_services/cancelation/cancelation_event.h> #include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -310,14 +311,14 @@ struct TEvKqp { return req; } - void SetClientLostAction(TActorId actorId, ui64 wakeupTag, NActors::TActorSystem* as) { + void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) { if (RequestCtx) { - RequestCtx->SetClientLostAction([actorId, wakeupTag, as]() { - as->Send(actorId, new TEvents::TEvWakeup(wakeupTag)); + RequestCtx->SetClientLostAction([actorId, as]() { + as->Send(actorId, new NGRpcService::TEvClientLost()); }); } else if (Record.HasCancelationActor()) { auto cancelationActor = ActorIdFromProto(Record.GetCancelationActor()); - NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, wakeupTag, as); + NGRpcService::SubscribeRemoteCancel(cancelationActor, actorId, as); } } diff --git a/ydb/core/kqp/gateway/CMakeLists.darwin.txt b/ydb/core/kqp/gateway/CMakeLists.darwin.txt index a2b7ff3564d..ca44b59ecb7 100644 --- a/ydb/core/kqp/gateway/CMakeLists.darwin.txt +++ b/ydb/core/kqp/gateway/CMakeLists.darwin.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-gateway PUBLIC cpp-actors-core ydb-core-actorlib_impl ydb-core-base + core-kqp-common providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE diff --git a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt index b68d9fc9930..a70cfb046b1 100644 --- a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-gateway PUBLIC cpp-actors-core ydb-core-actorlib_impl ydb-core-base + core-kqp-common providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE diff --git a/ydb/core/kqp/gateway/CMakeLists.linux.txt b/ydb/core/kqp/gateway/CMakeLists.linux.txt index b68d9fc9930..a70cfb046b1 100644 --- a/ydb/core/kqp/gateway/CMakeLists.linux.txt +++ b/ydb/core/kqp/gateway/CMakeLists.linux.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-gateway PUBLIC cpp-actors-core ydb-core-actorlib_impl ydb-core-base + core-kqp-common providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index c8ba81c82ae..51eb2197573 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -9,7 +9,6 @@ #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/dq/common/dq_value.h> #include <ydb/core/kqp/topics/kqp_topics.h> -#include <ydb/core/kqp/common/kqp_prepared_query.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/provider/yql_kikimr_gateway.h> #include <ydb/core/kqp/provider/yql_kikimr_settings.h> diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 18189c042c0..3ac3dffa5d3 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -325,10 +325,6 @@ private: TKqpSessionActor* This; }; -enum EWakeupTag { - ClientLost, -}; - public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_SESSION_ACTOR; @@ -514,22 +510,16 @@ public: } } - void HandleWakeup(TEvents::TEvWakeup::TPtr &ev) { - switch ((EWakeupTag) ev->Get()->Tag) { - case EWakeupTag::ClientLost: - LOG_D("Got TEvWakeup ClientLost event, send AbortExecution to executor: " - << ExecuterId); + void HandleClientLost(NGRpcService::TEvClientLost::TPtr&) { + LOG_D("Got ClientLost event, send AbortExecution to executor: " + << ExecuterId); - if (ExecuterId) { - auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here + if (ExecuterId) { + auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here - Send(ExecuterId, abortEv.Release()); - } - Cleanup(); - break; - default: - YQL_ENSURE(false, "Unexpected Wakeup tag for HandleWakeup: " << (ui64) ev->Get()->Tag); + Send(ExecuterId, abortEv.Release()); } + Cleanup(); } void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) { @@ -607,7 +597,7 @@ public: auto selfId = SelfId(); auto as = TActivationContext::ActorSystem(); - ev->Get()->SetClientLostAction(selfId, EWakeupTag::ClientLost, as); + ev->Get()->SetClientLostAction(selfId, as); switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: @@ -2158,7 +2148,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse); - hFunc(TEvents::TEvWakeup, HandleWakeup); + hFunc(NGRpcService::TEvClientLost, HandleClientLost); default: UnexpectedEvent("CompileState", ev); } @@ -2185,7 +2175,7 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvents::TEvWakeup, HandleWakeup); + hFunc(NGRpcService::TEvClientLost, HandleClientLost); // always come from WorkerActor hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); @@ -2211,7 +2201,7 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvents::TEvWakeup, HandleNoop); + hFunc(NGRpcService::TEvClientLost, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); diff --git a/ydb/core/load_test/CMakeLists.darwin.txt b/ydb/core/load_test/CMakeLists.darwin.txt index 2347724689f..7022fd47dbb 100644 --- a/ydb/core/load_test/CMakeLists.darwin.txt +++ b/ydb/core/load_test/CMakeLists.darwin.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-core-load_test PUBLIC core-blobstorage-pdisk ydb-core-control ydb-core-keyvalue + core-kqp-common ydb-library-workload public-lib-base public-lib-operation_id diff --git a/ydb/core/load_test/CMakeLists.linux-aarch64.txt b/ydb/core/load_test/CMakeLists.linux-aarch64.txt index 0968220b4d5..a7ece616e2b 100644 --- a/ydb/core/load_test/CMakeLists.linux-aarch64.txt +++ b/ydb/core/load_test/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-load_test PUBLIC core-blobstorage-pdisk ydb-core-control ydb-core-keyvalue + core-kqp-common ydb-library-workload public-lib-base public-lib-operation_id diff --git a/ydb/core/load_test/CMakeLists.linux.txt b/ydb/core/load_test/CMakeLists.linux.txt index 0968220b4d5..a7ece616e2b 100644 --- a/ydb/core/load_test/CMakeLists.linux.txt +++ b/ydb/core/load_test/CMakeLists.linux.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-load_test PUBLIC core-blobstorage-pdisk ydb-core-control ydb-core-keyvalue + core-kqp-common ydb-library-workload public-lib-base public-lib-operation_id diff --git a/ydb/core/persqueue/CMakeLists.darwin.txt b/ydb/core/persqueue/CMakeLists.darwin.txt index 7b9926052f1..148966b32e6 100644 --- a/ydb/core/persqueue/CMakeLists.darwin.txt +++ b/ydb/core/persqueue/CMakeLists.darwin.txt @@ -24,6 +24,7 @@ target_link_libraries(ydb-core-persqueue PUBLIC ydb-core-base core-engine-minikql ydb-core-keyvalue + core-kqp-common ydb-core-metering core-persqueue-codecs core-persqueue-config diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index fce2e6ac157..c58694fa222 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-core-persqueue PUBLIC ydb-core-base core-engine-minikql ydb-core-keyvalue + core-kqp-common ydb-core-metering core-persqueue-codecs core-persqueue-config diff --git a/ydb/core/persqueue/CMakeLists.linux.txt b/ydb/core/persqueue/CMakeLists.linux.txt index fce2e6ac157..c58694fa222 100644 --- a/ydb/core/persqueue/CMakeLists.linux.txt +++ b/ydb/core/persqueue/CMakeLists.linux.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-core-persqueue PUBLIC ydb-core-base core-engine-minikql ydb-core-keyvalue + core-kqp-common ydb-core-metering core-persqueue-codecs core-persqueue-config diff --git a/ydb/core/ymq/base/CMakeLists.darwin.txt b/ydb/core/ymq/base/CMakeLists.darwin.txt index c3bde7ce873..f8d33c5fae7 100644 --- a/ydb/core/ymq/base/CMakeLists.darwin.txt +++ b/ydb/core/ymq/base/CMakeLists.darwin.txt @@ -24,6 +24,7 @@ target_link_libraries(core-ymq-base PUBLIC ydb-core-base ydb-core-protos core-ymq-proto + core-kqp-common ydb-library-aclib library-http_proxy-authorization library-http_proxy-error diff --git a/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt b/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt index 2fa2b6fdcfd..9cab4b5c305 100644 --- a/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt +++ b/ydb/core/ymq/base/CMakeLists.linux-aarch64.txt @@ -25,6 +25,7 @@ target_link_libraries(core-ymq-base PUBLIC ydb-core-base ydb-core-protos core-ymq-proto + core-kqp-common ydb-library-aclib library-http_proxy-authorization library-http_proxy-error diff --git a/ydb/core/ymq/base/CMakeLists.linux.txt b/ydb/core/ymq/base/CMakeLists.linux.txt index 2fa2b6fdcfd..9cab4b5c305 100644 --- a/ydb/core/ymq/base/CMakeLists.linux.txt +++ b/ydb/core/ymq/base/CMakeLists.linux.txt @@ -25,6 +25,7 @@ target_link_libraries(core-ymq-base PUBLIC ydb-core-base ydb-core-protos core-ymq-proto + core-kqp-common ydb-library-aclib library-http_proxy-authorization library-http_proxy-error diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 02178732b16..02b10d5affe 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -17,7 +17,6 @@ #include <ydb/public/lib/yson_value/ydb_yson_value.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> -#include <ydb/core/kqp/common/kqp.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/event_pb.h> |