aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-03-07 20:40:32 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-03-07 20:40:32 +0300
commitee1ab05c311bc7204bd88a00e9db01331a078961 (patch)
tree61a51760a0b54d4c29ee8d65ac643bf58691d3df
parent66db41e47f76c3f2902855ca1c84509f35e42ff2 (diff)
downloadydb-ee1ab05c311bc7204bd88a00e9db01331a078961.tar.gz
Remove timeout from COMPUTE_ACTOR
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp83
1 files changed, 40 insertions, 43 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index eb6e70aac9d..e24d9df17fe 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -52,21 +52,6 @@ TString TasksIdsStr(const TTasksCollection& tasks) {
class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
using TBase = TActorBootstrapped<TKqpNodeService>;
- struct TEvPrivate {
- enum EEv {
- EvTimeout = EventSpaceBegin(TEvents::ES_PRIVATE),
- };
-
- struct TEvTimeout : public TEventLocal<TEvTimeout, EEv::EvTimeout> {
- const ui64 TxId;
- const TActorId Requester;
-
- TEvTimeout(ui64 txId, const TActorId& requester)
- : TxId(txId)
- , Requester(requester) {}
- };
- };
-
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_NODE_SERVICE;
@@ -94,6 +79,7 @@ public:
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
}
+ Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
Become(&TKqpNodeService::WorkState);
}
@@ -103,7 +89,7 @@ private:
hFunc(TEvKqpNode::TEvStartKqpTasksRequest, HandleWork);
hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork);
hFunc(TEvKqpNode::TEvCancelKqpTasksRequest, HandleWork);
- hFunc(TEvPrivate::TEvTimeout, HandleWork);
+ hFunc(TEvents::TEvWakeup, HandleWork);
// misc
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleWork);
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, HandleWork);
@@ -254,8 +240,11 @@ private:
NYql::NDq::TComputeRuntimeSettings runtimeSettingsBase;
auto& msgRtSettings = msg.GetRuntimeSettings();
if (msgRtSettings.GetTimeoutMs() > 0) {
- runtimeSettingsBase.Timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs());
- request.Deadline = TAppData::TimeProvider->Now() + *runtimeSettingsBase.Timeout;
+ // compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer
+ auto timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs());
+ request.Deadline = TAppData::TimeProvider->Now() + timeout + /* gap */ TDuration::Seconds(5);
+
+ ExpiringRequests.emplace(request.Deadline, TRequestId{txId, requester});
}
runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool;
@@ -327,12 +316,6 @@ private:
ActorIdToProto(taskCtx.ComputeActorId, startedTask->MutableActorId());
}
- if (runtimeSettingsBase.Timeout) {
- request.TimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(),
- *runtimeSettingsBase.Timeout + /* gap */ TDuration::Seconds(5),
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvTimeout(txId, requester)));
- }
-
Send(request.Executer, reply.Release(), IEventHandle::FlagTrackDelivery, txId);
State.NewRequest(txId, requester, std::move(request), memoryPool);
@@ -351,8 +334,14 @@ private:
if (request.InFlyTasks.empty()) {
LOG_D("TxId: " << msg.TxId << ", requester: " << requester << " completed");
- if (request.TimeoutTimer) {
- Send(request.TimeoutTimer, new TEvents::TEvPoison);
+ auto bounds = ExpiringRequests.equal_range(request.Deadline);
+ for (auto it = bounds.first; it != bounds.second; ) {
+ if (it->second.TxId == msg.TxId && it->second.Requester == requester) {
+ auto delIt = it++;
+ ExpiringRequests.erase(delIt);
+ } else {
+ ++it;
+ }
}
ResourceManager()->FreeResources(msg.TxId);
@@ -388,10 +377,6 @@ private:
State.RemoveTx(txId, [this, &txId, &reason](const NKqpNode::TTasksRequest& request) {
LOG_D("TxId: " << txId << ", cancel granted resources");
- if (request.TimeoutTimer) {
- Send(request.TimeoutTimer, new TEvents::TEvPoison);
- }
-
ResourceManager()->FreeResources(txId);
for (auto& [taskId, task] : request.InFlyTasks) {
@@ -401,21 +386,28 @@ private:
});
}
- void HandleWork(TEvPrivate::TEvTimeout::TPtr& ev) {
- ui64 txId = ev->Get()->TxId;
- TActorId requester = ev->Get()->Requester;
-
- LOG_E("txId: " << txId << ", requester: " << requester << ", execution timeout");
+ void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
+ Schedule(TDuration::Seconds(1), ev->Release().Release());
+
+ auto it = ExpiringRequests.begin();
+ auto now = TAppData::TimeProvider->Now();
+ while (it != ExpiringRequests.end() && it->first < now) {
+ auto reqId = it->second;
+ auto delIt = it++;
+ ExpiringRequests.erase(delIt);
+
+ auto request = State.RemoveRequest(reqId.TxId, reqId.Requester);
+ LOG_D("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", execution timeout, request: " << bool(request));
+ if (!request) {
+ // it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer
+ LOG_I("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", unknown request");
+ continue;
+ }
- auto request = State.RemoveRequest(txId, requester);
- if (!request) {
- LOG_E("txId: " << txId << ", requester: " << requester << ", unknown request");
- return;
+ ResourceManager()->FreeResources(reqId.TxId);
+ // don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order:
+ // KqpProxy -> SessionActor -> Executer -> ComputeActor
}
-
- ResourceManager()->FreeResources(txId);
-
- // don't send to executer and compute actors, they have their own timers with smaller timeout
}
private:
@@ -546,6 +538,11 @@ private:
IKqpNodeComputeActorFactory* CaFactory;
NRm::IKqpResourceManager* ResourceManager_ = nullptr;
NKqpNode::TState State;
+ struct TRequestId {
+ ui64 TxId;
+ TActorId Requester;
+ };
+ std::multimap<TInstant, TRequestId> ExpiringRequests;
};