diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-01 15:53:19 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-01 15:53:19 +0300 |
commit | 313655d9256cea661cf09efb7c94d7c9d0a937d6 (patch) | |
tree | 5dfc32ee32dd27e4048adeadea4725f95deeaf04 | |
parent | 0a4eb28a1cfee4927aaf1539d9f6c65282764880 (diff) | |
download | ydb-313655d9256cea661cf09efb7c94d7c9d0a937d6.tar.gz |
Implement one LongTimer in KQP
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_request.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 57 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 60 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 78 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 30 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 4 |
8 files changed, 123 insertions, 141 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_request.cpp b/ydb/core/kqp/compile_service/kqp_compile_request.cpp index 3e846d53ac3..d8d28b37f2d 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_request.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_request.cpp @@ -47,9 +47,6 @@ public: Orbit, Query ? Query->UserSid : 0); - TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(), - new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup())); - TMaybe<TKqpQueryId> query; std::swap(Query, query); @@ -119,20 +116,8 @@ public: DeferredResponse.Reset(); } - void HandleTimeout(const TActorContext& ctx) { - LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Compile request deadline exceeded" - << ", self: " << ctx.SelfID); - - NYql::TIssue issue(NYql::TPosition(), "Deadline exceeded during query compilation."); - return ReplyError(Ydb::StatusIds::TIMEOUT, {issue}, ctx); - } - - void Die(const NActors::TActorContext& ctx) override { - if (TimeoutTimerId) { - ctx.Send(TimeoutTimerId, new TEvents::TEvPoisonPill()); - } - - TBase::Die(ctx); + void Handle(TEvKqp::TEvAbortExecution::TPtr& , const TActorContext &ctx) { + this->Die(ctx); } private: @@ -141,7 +126,7 @@ private: switch (ev->GetTypeRewrite()) { HFunc(TEvKqp::TEvCompileResponse, Handle); HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + HFunc(TEvKqp::TEvAbortExecution, Handle); default: UnexpectedEvent("MainState", ev->GetTypeRewrite(), ctx); } @@ -327,7 +312,6 @@ private: bool KeepInCache = false; TInstant Deadline; TKqpDbCountersPtr DbCounters; - TActorId TimeoutTimerId; THashMap<TTableId, ui64> TableVersions; THolder<TEvKqp::TEvCompileResponse> DeferredResponse; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index ac47bc41ffd..1178df22fa9 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -269,7 +269,6 @@ public: hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); - hFunc(TEvents::TEvWakeup, HandleTimeout); default: UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); } @@ -309,7 +308,6 @@ private: hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare); hFunc(TEvKqp::TEvAbortExecution, HandlePrepare); - hFunc(TEvents::TEvWakeup, HandlePrepare); hFunc(TEvents::TEvUndelivered, HandleUndelivered); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); @@ -516,11 +514,6 @@ private: TBase::HandleAbortExecution(ev); } - void HandlePrepare(TEvents::TEvWakeup::TPtr& ev) { - CancelProposal(0); - TBase::HandleTimeout(ev); - } - void CancelProposal(ui64 exceptShardId) { for (auto& [shardId, state] : ShardStates) { if (shardId != exceptShardId && @@ -909,7 +902,6 @@ private: hFunc(TEvDqCompute::TEvState, HandleComputeStats); hFunc(TEvDqCompute::TEvChannelData, HandleExecute); hFunc(TEvKqp::TEvAbortExecution, HandleExecute); - hFunc(TEvents::TEvWakeup, HandleTimeout); IgnoreFunc(TEvInterconnect::TEvNodeConnected); default: UnexpectedEvent("ExecuteState", ev->GetTypeRewrite()); @@ -1933,7 +1925,6 @@ private: switch (ev->GetTypeRewrite()) { hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); - hFunc(TEvents::TEvWakeup, HandleTimeout); default: UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite()); } @@ -2186,7 +2177,7 @@ private: Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), Request.Snapshot, Database, Nothing(), Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, - Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), + Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), ExecuterSpan, {}, ExecuterRetriesConfig); Planner->ProcessTasksForDataExecuter(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index d163396401c..6446cde48b2 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -106,8 +106,8 @@ protected: }; public: - TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, - TKqpRequestCounters::TPtr counters, + TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, + TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, ui64 spanVerbosity = 0, TString spanName = "no_name") : Request(std::move(request)) @@ -300,6 +300,7 @@ protected: STATEFN(ReadyState) { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpExecuter::TEvTxRequest, HandleReady); + hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); default: { UnexpectedEvent("ReadyState", ev->GetTypeRewrite()); } @@ -318,25 +319,6 @@ protected: ActorIdToProto(this->SelfId(), progressEv->Record.MutableExecuterActorId()); this->Send(Target, progressEv.Release()); - const auto now = TAppData::TimeProvider->Now(); - const auto& ctx = TlsActivationContext->AsActorContext(); - TMaybe<TDuration> timeout; - if (Deadline) { - timeout = *Deadline - now; - DeadlineActor = CreateLongTimer(ctx, *timeout, - new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup(0))); - } - - TMaybe<TDuration> cancelAfter; - if (CancelAt) { - cancelAfter = *CancelAt - now; - CancelAtActor = CreateLongTimer(ctx, *cancelAfter, - new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup(1))); - } - - LOG_I("Begin execution. Operation timeout: " << timeout << ", cancelAfter: " << cancelAfter - << ", txs: " << Request.Transactions.size()); - if (IsDebugLogEnabled()) { for (auto& tx : Request.Transactions) { LOG_D("Executing physical tx, type: " << (ui32) tx.Body->GetType() << ", stages: " << tx.Body->StagesSize()); @@ -356,6 +338,7 @@ protected: ExecuterTableResolveSpan = NWilson::TSpan(TWilsonKqp::ExecuterTableResolve, ExecuterStateSpan.GetTraceId(), "ExecuterTableResolve", NWilson::EFlags::AUTO_END); + auto now = TAppData::TimeProvider->Now(); StartResolveTime = now; if (Stats) { @@ -531,24 +514,12 @@ protected: if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) { InternalError(issues); } else if (statusCode == Ydb::StatusIds::TIMEOUT) { - AbortExecutionAndDie(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); + AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); } else { RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues); } } - void HandleTimeout(TEvents::TEvWakeup::TPtr& ev) { - bool cancel = ev->Get()->Tag == 1; - LOG_I((cancel ? "CancelAt" : "Timeout") << " exceeded. Send timeout event to the rpc actor " << Target); - if (cancel) { - CancelAtActor = {}; - AbortExecutionAndDie(NYql::NDqProto::StatusIds::CANCELLED, "Request timeout exceeded"); - } else { - DeadlineActor = {}; - AbortExecutionAndDie(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); - } - } - protected: void CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates) { YQL_ENSURE(task.ComputeActorId); @@ -1016,14 +987,18 @@ protected: ReplyErrorAndDie(status, &issues); } - void AbortExecutionAndDie(NYql::NDqProto::StatusIds::StatusCode status, const TString& message) { + void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) { LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message); - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded"); if (ExecuterSpan) { ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); } - this->Send(Target, abortEv.Release()); + // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). + // If it have come from SessionActor there is no need to send new TEvAbortExecution back + if (abortSender != Target) { + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded"); + this->Send(Target, abortEv.Release()); + } Request.Transactions.crop(0); TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); this->PassAway(); @@ -1112,12 +1087,6 @@ protected: protected: void PassAway() override { LOG_D("terminate execution."); - if (DeadlineActor) { - this->Send(DeadlineActor, new TEvents::TEvPoison); - } - if (CancelAtActor) { - this->Send(CancelAtActor, new TEvents::TEvPoison); - } if (KqpShardsResolverId) { this->Send(KqpShardsResolverId, new TEvents::TEvPoison); } @@ -1172,9 +1141,7 @@ protected: std::unique_ptr<TQueryExecutionStats> Stats; TInstant StartTime; TMaybe<TInstant> Deadline; - TActorId DeadlineActor; TMaybe<TInstant> CancelAt; - TActorId CancelAtActor; TActorId Target; ui64 TxId = 0; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 85542fbcdac..a7743150080 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -103,7 +103,6 @@ public: hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); - hFunc(TEvents::TEvWakeup, HandleTimeout); default: UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); } @@ -121,7 +120,6 @@ private: hFunc(TEvDqCompute::TEvState, HandleComputeStats); hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); - hFunc(TEvents::TEvWakeup, HandleTimeout); hFunc(TEvents::TEvUndelivered, HandleUndelivered); hFunc(TEvPrivate::TEvRetry, HandleRetry); hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 55d2a26a505..ee2a2969ab6 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -52,7 +52,7 @@ TString MakeKqpProxyBoardPath(const TString& database) { static constexpr TDuration DEFAULT_KEEP_ALIVE_TIMEOUT = TDuration::MilliSeconds(5000); -static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds(10); +static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds(50); static constexpr TDuration DEFAULT_CREATE_SESSION_TIMEOUT = TDuration::MilliSeconds(5000); @@ -115,11 +115,22 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { struct TEvCollectPeerProxyData: public TEventLocal<TEvCollectPeerProxyData, EEv::EvCollectPeerProxyData> {}; struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> { - public: - ui64 RequestId; - TDuration Timeout; - - TEvOnRequestTimeout(ui64 requestId, TDuration timeout): RequestId(requestId), Timeout(timeout) {}; + ui64 RequestId; + TDuration Timeout; + NYql::NDqProto::StatusIds::StatusCode Status; + int Round; + + TEvOnRequestTimeout(ui64 requestId, TDuration timeout, NYql::NDqProto::StatusIds::StatusCode status, int round) + : RequestId(requestId) + , Timeout(timeout) + , Status(status) + , Round(round) + {} + + void TickNextRound() { + ++Round; + Timeout = DEFAULT_EXTRA_TIMEOUT_WAIT; + } }; struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {}; @@ -539,13 +550,15 @@ public: } } - // We add extra milliseconds to the user-specified timeout, so it means we give additional priority for worker replies, - // because it is much better to give detailed error message rather than generic timeout. - // For example, it helps to avoid race in event order when worker and proxy recieve timeout at the same moment. - // If worker located in the different datacenter we should better substract some RTT estimate, but at this point it's not done. + auto cancelAfter = ev->Get()->GetCancelAfter(); auto timeout = ev->Get()->GetOperationTimeout(); - auto timeoutMs = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig) + DEFAULT_EXTRA_TIMEOUT_WAIT; - StartQueryTimeout(requestId, timeoutMs); + auto timerDuration = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig); + if (cancelAfter) { + timerDuration = Min(timerDuration, cancelAfter); + } + KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "TEvQueryRequest, set timer for: " << timerDuration << " timeout: " << timeout << " cancelAfter: " << cancelAfter); + auto status = timerDuration == cancelAfter ? NYql::NDqProto::StatusIds::CANCELLED : NYql::NDqProto::StatusIds::TIMEOUT; + StartQueryTimeout(requestId, timerDuration, status); Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId); KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId << ", targetId: " << targetId << ", sessionId: " << sessionId); @@ -933,10 +946,10 @@ public: Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); } - void StartQueryTimeout(ui64 requestId, TDuration timeout) { + void StartQueryTimeout(ui64 requestId, TDuration timeout, NYql::NDqProto::StatusIds::StatusCode status = NYql::NDqProto::StatusIds::TIMEOUT) { TActorId timeoutTimer = CreateLongTimer( TlsActivationContext->AsActorContext(), timeout, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvOnRequestTimeout(requestId, timeout)) + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvOnRequestTimeout{requestId, timeout, status, 0}) ); KQP_PROXY_LOG_D("Scheduled timeout timer for requestId: " << requestId << " timeout: " << timeout << " actor id: " << timeoutTimer); @@ -954,7 +967,9 @@ public: } void Handle(TEvPrivate::TEvOnRequestTimeout::TPtr& ev) { + auto* msg = ev->Get(); ui64 requestId = ev->Get()->RequestId; + TimeoutTimers.erase(requestId); KQP_PROXY_LOG_D("Handle TEvPrivate::TEvOnRequestTimeout(" << requestId << ")"); const TKqpProxyRequest* reqInfo = PendingRequests.FindPtr(requestId); @@ -963,9 +978,20 @@ public: return; } - TString message = TStringBuilder() << "Query did not complete within specified timeout, session id " << reqInfo->SessionId; - KQP_PROXY_LOG_D("Reply timeout: requestId " << requestId << " sessionId" << reqInfo->SessionId); - ReplyProcessError(Ydb::StatusIds::TIMEOUT, message, requestId); + KQP_PROXY_LOG_D("Reply timeout: requestId " << requestId << " sessionId: " << reqInfo->SessionId + << " status: " << NYql::NDq::DqStatusToYdbStatus(msg->Status) << " round: " << msg->Round); + + const TKqpSessionInfo* info = LocalSessions->FindPtr(reqInfo->SessionId); + if (msg->Round == 0 && info) { + TString message = TStringBuilder() << "request's " << (msg->Status == NYql::NDqProto::StatusIds::TIMEOUT ? "timeout" : "cancelAfter") << " exceeded"; + Send(info->WorkerId, new TEvKqp::TEvAbortExecution(msg->Status, message)); + auto newEv = ev->Release().Release(); + newEv->TickNextRound(); + Schedule(newEv->Timeout, newEv); + } else { + TString message = TStringBuilder() << "Query did not complete within specified timeout, session id " << reqInfo->SessionId; + ReplyProcessError(NYql::NDq::DqStatusToYdbStatus(msg->Status), message, requestId); + } } void Handle(TEvKqp::TEvCloseSessionResponse::TPtr& ev) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 033ee620faa..23b4b87a2c4 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -645,7 +645,7 @@ public: QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit(), QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId()); - RegisterWithSameMailbox(compileRequestActor); + CompileActorId = RegisterWithSameMailbox(compileRequestActor); Become(&TKqpSessionActor::CompileState); } @@ -654,14 +654,18 @@ public: ReplyBusy(ev); } - void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) { - auto compileResult = ev->Get()->CompileResult; + void Handle(TEvKqp::TEvCompileResponse::TPtr& ev) { + if (ev->Sender != CompileActorId) { + return; + } + CompileActorId = TActorId{}; TTimerGuard timer(this); - QueryState->Orbit = std::move(ev->Get()->Orbit); - QueryState->MaxReadType = compileResult->MaxReadType; + auto compileResult = ev->Get()->CompileResult; YQL_ENSURE(compileResult); YQL_ENSURE(QueryState); + QueryState->MaxReadType = compileResult->MaxReadType; + QueryState->Orbit = std::move(ev->Get()->Orbit); LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status); @@ -1192,7 +1196,7 @@ public: bool pure = tx && tx->IsPureTx(); auto request = PrepareRequest(tx, pure, QueryState.get()); - LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit + LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " pure: " << pure << " commit: " << commit << " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size()); if (!CheckTopicOperations()) { @@ -1293,13 +1297,13 @@ public: RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig()); auto exId = RegisterWithSameMailbox(executerActor); - LOG_D("Created new KQP executer: " << exId); + LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); auto ev = std::make_unique<TEvTxUserProxy::TEvProposeKqpTransaction>(exId); Send(MakeTxProxyID(), ev.release()); if (!isRollback) { Y_VERIFY(!ExecuterId); - ExecuterId = exId; } + ExecuterId = exId; } @@ -1307,21 +1311,6 @@ public: void HandleNoop(T&) { } - void HandleTxResponse(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { - if (ev->Sender == ExecuterId) { - auto& response = ev->Get()->Record.GetResponse(); - TIssues issues; - IssuesFromMessage(response.GetIssues(), issues); - - auto err = TStringBuilder() << "Got response from our executor: " << ev->Sender - << ", Status: " << ev->Get()->Record.GetResponse().GetStatus() - << ", Issues: " << issues.ToString() - << " while we are in " << CurrentStateFuncName(); - LOG_E(err); - YQL_ENSURE(false, "" << err); - } - } - void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) { ReplyBusy(ev); } @@ -1444,14 +1433,23 @@ public: TlsActivationContext->Send(ev->Forward(ExecuterId)); } - void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) { + void Handle(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; - ExecuterId = TActorId{}; + if (CompileActorId) { + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded"); + Send(std::exchange(CompileActorId, {}), abortEv.Release()); + } + + if (ExecuterId) { + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded"); + Send(std::exchange(ExecuterId, {}), abortEv.Release()); + } const auto& issues = ev->Get()->GetIssues(); - LOG_I("Got TEvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())); - ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "Got AbortExecution", MessageFromIssues(issues)); + TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName(); + LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())); + ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues)); } void CollectSystemViewQueryStats(const NKqpProto::TKqpStatsQuery* stats, TDuration queryDuration, @@ -1922,6 +1920,9 @@ public: } void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { + if (ev->Sender != ExecuterId) { + return; + } if (QueryState) { QueryState->Orbit = std::move(ev->Get()->Orbit); } @@ -2033,7 +2034,10 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse); + + // forgotten messages from previous aborted request + hFunc(TEvKqp::TEvCompileResponse, HandleNoop); + hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); default: UnexpectedEvent("ReadyState", ev); } @@ -2048,13 +2052,16 @@ public: try { switch (ev->GetTypeRewrite()) { hFunc(TEvKqp::TEvQueryRequest, HandleCompile); - hFunc(TEvKqp::TEvCompileResponse, HandleCompile); + hFunc(TEvKqp::TEvCompileResponse, Handle); + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse); hFunc(NGRpcService::TEvClientLost, HandleClientLost); + + // forgotten messages from previous aborted request + hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); default: UnexpectedEvent("CompileState", ev); } @@ -2075,7 +2082,7 @@ public: hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleExecute); - hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute); + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle); hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute); hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); @@ -2083,6 +2090,9 @@ public: hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(NGRpcService::TEvClientLost, HandleClientLost); + // forgotten messages from previous aborted request + hFunc(TEvKqp::TEvCompileResponse, Handle); + // always come from WorkerActor hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); default: @@ -2102,13 +2112,16 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleCleanup); hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup); hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); - hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(NGRpcService::TEvClientLost, HandleNoop); + // forgotten messages from previous aborted request + hFunc(TEvKqp::TEvCompileResponse, HandleNoop); + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop); + // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); default: @@ -2282,6 +2295,7 @@ private: TIntrusivePtr<TModuleResolverState> ModuleResolverState; TKqpSettings::TConstPtr KqpSettings; std::optional<TActorId> WorkerId; + TActorId CompileActorId; TActorId ExecuterId; std::shared_ptr<TKqpQueryState> QueryState; diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index 89f9d7323f3..67d835b83eb 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -387,26 +387,32 @@ Y_UNIT_TEST_SUITE(KqpLimits) { })); } - Y_UNIT_TEST(QueryExecCancel) { + Y_UNIT_TEST(QueryExecTimeoutCancel) { TKikimrRunner kikimr; CreateLargeTable(kikimr, 500000, 10, 100, 5000, 1); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - auto prepareResult = session.PrepareDataQuery(Q_(R"( - SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "11111"; - )")).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); - auto dataQuery = prepareResult.GetQuery(); - - auto settings = TExecDataQuerySettings() - .CancelAfter(TDuration::MilliSeconds(100)); + for (auto status : {EStatus::TIMEOUT, EStatus::CANCELLED}) { + auto prepareResult = session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "11111"; + )")).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + auto settings = TExecDataQuerySettings(); + if (status == EStatus::TIMEOUT) { + settings.OperationTimeout(TDuration::MilliSeconds(100)); + } else { + settings.CancelAfter(TDuration::MilliSeconds(100)); + } - auto result = dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); + auto result = dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); - result.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CANCELLED); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status); + } } Y_UNIT_TEST(QueryExecTimeout) { diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 400361d6fe4..23e4d780639 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -2059,7 +2059,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, StreamLookup TString tmpSessionId = CreateSessionRPC(runtime); auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 3;"), tmpSessionId, "", true); req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1); - req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1); auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req))); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT); } @@ -2085,7 +2084,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, StreamLookup TString tmpSessionId = CreateSessionRPC(runtime); auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 3;"), tmpSessionId, "", true); req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1); - req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1); auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req))); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT); } @@ -2229,7 +2227,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, StreamLookup) auto sender4 = CreateSessionRPC(runtime); auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender4, "", true); req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1); - req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1); auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req))); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT); } @@ -2270,7 +2267,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, StreamLookup) auto sender5 = CreateSessionRPC(runtime); auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender5, "", true); req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1); - req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1); auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req))); UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT); } |