diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-07 20:40:32 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-07 20:40:32 +0300 |
commit | ee1ab05c311bc7204bd88a00e9db01331a078961 (patch) | |
tree | 61a51760a0b54d4c29ee8d65ac643bf58691d3df | |
parent | 66db41e47f76c3f2902855ca1c84509f35e42ff2 (diff) | |
download | ydb-ee1ab05c311bc7204bd88a00e9db01331a078961.tar.gz |
Remove timeout from COMPUTE_ACTOR
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 83 |
1 files changed, 40 insertions, 43 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index eb6e70aac9d..e24d9df17fe 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -52,21 +52,6 @@ TString TasksIdsStr(const TTasksCollection& tasks) { class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> { using TBase = TActorBootstrapped<TKqpNodeService>; - struct TEvPrivate { - enum EEv { - EvTimeout = EventSpaceBegin(TEvents::ES_PRIVATE), - }; - - struct TEvTimeout : public TEventLocal<TEvTimeout, EEv::EvTimeout> { - const ui64 TxId; - const TActorId Requester; - - TEvTimeout(ui64 txId, const TActorId& requester) - : TxId(txId) - , Requester(requester) {} - }; - }; - public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_NODE_SERVICE; @@ -94,6 +79,7 @@ public: TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); } + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); Become(&TKqpNodeService::WorkState); } @@ -103,7 +89,7 @@ private: hFunc(TEvKqpNode::TEvStartKqpTasksRequest, HandleWork); hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork); hFunc(TEvKqpNode::TEvCancelKqpTasksRequest, HandleWork); - hFunc(TEvPrivate::TEvTimeout, HandleWork); + hFunc(TEvents::TEvWakeup, HandleWork); // misc hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleWork); hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, HandleWork); @@ -254,8 +240,11 @@ private: NYql::NDq::TComputeRuntimeSettings runtimeSettingsBase; auto& msgRtSettings = msg.GetRuntimeSettings(); if (msgRtSettings.GetTimeoutMs() > 0) { - runtimeSettingsBase.Timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs()); - request.Deadline = TAppData::TimeProvider->Now() + *runtimeSettingsBase.Timeout; + // compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer + auto timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs()); + request.Deadline = TAppData::TimeProvider->Now() + timeout + /* gap */ TDuration::Seconds(5); + + ExpiringRequests.emplace(request.Deadline, TRequestId{txId, requester}); } runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool; @@ -327,12 +316,6 @@ private: ActorIdToProto(taskCtx.ComputeActorId, startedTask->MutableActorId()); } - if (runtimeSettingsBase.Timeout) { - request.TimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), - *runtimeSettingsBase.Timeout + /* gap */ TDuration::Seconds(5), - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvTimeout(txId, requester))); - } - Send(request.Executer, reply.Release(), IEventHandle::FlagTrackDelivery, txId); State.NewRequest(txId, requester, std::move(request), memoryPool); @@ -351,8 +334,14 @@ private: if (request.InFlyTasks.empty()) { LOG_D("TxId: " << msg.TxId << ", requester: " << requester << " completed"); - if (request.TimeoutTimer) { - Send(request.TimeoutTimer, new TEvents::TEvPoison); + auto bounds = ExpiringRequests.equal_range(request.Deadline); + for (auto it = bounds.first; it != bounds.second; ) { + if (it->second.TxId == msg.TxId && it->second.Requester == requester) { + auto delIt = it++; + ExpiringRequests.erase(delIt); + } else { + ++it; + } } ResourceManager()->FreeResources(msg.TxId); @@ -388,10 +377,6 @@ private: State.RemoveTx(txId, [this, &txId, &reason](const NKqpNode::TTasksRequest& request) { LOG_D("TxId: " << txId << ", cancel granted resources"); - if (request.TimeoutTimer) { - Send(request.TimeoutTimer, new TEvents::TEvPoison); - } - ResourceManager()->FreeResources(txId); for (auto& [taskId, task] : request.InFlyTasks) { @@ -401,21 +386,28 @@ private: }); } - void HandleWork(TEvPrivate::TEvTimeout::TPtr& ev) { - ui64 txId = ev->Get()->TxId; - TActorId requester = ev->Get()->Requester; - - LOG_E("txId: " << txId << ", requester: " << requester << ", execution timeout"); + void HandleWork(TEvents::TEvWakeup::TPtr& ev) { + Schedule(TDuration::Seconds(1), ev->Release().Release()); + + auto it = ExpiringRequests.begin(); + auto now = TAppData::TimeProvider->Now(); + while (it != ExpiringRequests.end() && it->first < now) { + auto reqId = it->second; + auto delIt = it++; + ExpiringRequests.erase(delIt); + + auto request = State.RemoveRequest(reqId.TxId, reqId.Requester); + LOG_D("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", execution timeout, request: " << bool(request)); + if (!request) { + // it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer + LOG_I("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", unknown request"); + continue; + } - auto request = State.RemoveRequest(txId, requester); - if (!request) { - LOG_E("txId: " << txId << ", requester: " << requester << ", unknown request"); - return; + ResourceManager()->FreeResources(reqId.TxId); + // don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order: + // KqpProxy -> SessionActor -> Executer -> ComputeActor } - - ResourceManager()->FreeResources(txId); - - // don't send to executer and compute actors, they have their own timers with smaller timeout } private: @@ -546,6 +538,11 @@ private: IKqpNodeComputeActorFactory* CaFactory; NRm::IKqpResourceManager* ResourceManager_ = nullptr; NKqpNode::TState State; + struct TRequestId { + ui64 TxId; + TActorId Requester; + }; + std::multimap<TInstant, TRequestId> ExpiringRequests; }; |