aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-03-01 15:53:19 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-03-01 15:53:19 +0300
commit313655d9256cea661cf09efb7c94d7c9d0a937d6 (patch)
tree5dfc32ee32dd27e4048adeadea4725f95deeaf04
parent0a4eb28a1cfee4927aaf1539d9f6c65282764880 (diff)
downloadydb-313655d9256cea661cf09efb7c94d7c9d0a937d6.tar.gz
Implement one LongTimer in KQP
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_request.cpp22
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h57
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp60
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp78
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp30
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp4
8 files changed, 123 insertions, 141 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_request.cpp b/ydb/core/kqp/compile_service/kqp_compile_request.cpp
index 3e846d53ac3..d8d28b37f2d 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_request.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_request.cpp
@@ -47,9 +47,6 @@ public:
Orbit,
Query ? Query->UserSid : 0);
- TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(),
- new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup()));
-
TMaybe<TKqpQueryId> query;
std::swap(Query, query);
@@ -119,20 +116,8 @@ public:
DeferredResponse.Reset();
}
- void HandleTimeout(const TActorContext& ctx) {
- LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Compile request deadline exceeded"
- << ", self: " << ctx.SelfID);
-
- NYql::TIssue issue(NYql::TPosition(), "Deadline exceeded during query compilation.");
- return ReplyError(Ydb::StatusIds::TIMEOUT, {issue}, ctx);
- }
-
- void Die(const NActors::TActorContext& ctx) override {
- if (TimeoutTimerId) {
- ctx.Send(TimeoutTimerId, new TEvents::TEvPoisonPill());
- }
-
- TBase::Die(ctx);
+ void Handle(TEvKqp::TEvAbortExecution::TPtr& , const TActorContext &ctx) {
+ this->Die(ctx);
}
private:
@@ -141,7 +126,7 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(TEvKqp::TEvCompileResponse, Handle);
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ HFunc(TEvKqp::TEvAbortExecution, Handle);
default:
UnexpectedEvent("MainState", ev->GetTypeRewrite(), ctx);
}
@@ -327,7 +312,6 @@ private:
bool KeepInCache = false;
TInstant Deadline;
TKqpDbCountersPtr DbCounters;
- TActorId TimeoutTimerId;
THashMap<TTableId, ui64> TableVersions;
THolder<TEvKqp::TEvCompileResponse> DeferredResponse;
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index ac47bc41ffd..1178df22fa9 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -269,7 +269,6 @@ public:
hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
- hFunc(TEvents::TEvWakeup, HandleTimeout);
default:
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
}
@@ -309,7 +308,6 @@ private:
hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
- hFunc(TEvents::TEvWakeup, HandlePrepare);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
@@ -516,11 +514,6 @@ private:
TBase::HandleAbortExecution(ev);
}
- void HandlePrepare(TEvents::TEvWakeup::TPtr& ev) {
- CancelProposal(0);
- TBase::HandleTimeout(ev);
- }
-
void CancelProposal(ui64 exceptShardId) {
for (auto& [shardId, state] : ShardStates) {
if (shardId != exceptShardId &&
@@ -909,7 +902,6 @@ private:
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
- hFunc(TEvents::TEvWakeup, HandleTimeout);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default:
UnexpectedEvent("ExecuteState", ev->GetTypeRewrite());
@@ -1933,7 +1925,6 @@ private:
switch (ev->GetTypeRewrite()) {
hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
- hFunc(TEvents::TEvWakeup, HandleTimeout);
default:
UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite());
}
@@ -2186,7 +2177,7 @@ private:
Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), Request.Snapshot,
Database, Nothing(), Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
- Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(),
+ Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(),
ExecuterSpan, {}, ExecuterRetriesConfig);
Planner->ProcessTasksForDataExecuter();
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index d163396401c..6446cde48b2 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -106,8 +106,8 @@ protected:
};
public:
- TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters,
+ TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
+ TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
ui64 spanVerbosity = 0, TString spanName = "no_name")
: Request(std::move(request))
@@ -300,6 +300,7 @@ protected:
STATEFN(ReadyState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpExecuter::TEvTxRequest, HandleReady);
+ hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
default: {
UnexpectedEvent("ReadyState", ev->GetTypeRewrite());
}
@@ -318,25 +319,6 @@ protected:
ActorIdToProto(this->SelfId(), progressEv->Record.MutableExecuterActorId());
this->Send(Target, progressEv.Release());
- const auto now = TAppData::TimeProvider->Now();
- const auto& ctx = TlsActivationContext->AsActorContext();
- TMaybe<TDuration> timeout;
- if (Deadline) {
- timeout = *Deadline - now;
- DeadlineActor = CreateLongTimer(ctx, *timeout,
- new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup(0)));
- }
-
- TMaybe<TDuration> cancelAfter;
- if (CancelAt) {
- cancelAfter = *CancelAt - now;
- CancelAtActor = CreateLongTimer(ctx, *cancelAfter,
- new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup(1)));
- }
-
- LOG_I("Begin execution. Operation timeout: " << timeout << ", cancelAfter: " << cancelAfter
- << ", txs: " << Request.Transactions.size());
-
if (IsDebugLogEnabled()) {
for (auto& tx : Request.Transactions) {
LOG_D("Executing physical tx, type: " << (ui32) tx.Body->GetType() << ", stages: " << tx.Body->StagesSize());
@@ -356,6 +338,7 @@ protected:
ExecuterTableResolveSpan = NWilson::TSpan(TWilsonKqp::ExecuterTableResolve, ExecuterStateSpan.GetTraceId(), "ExecuterTableResolve", NWilson::EFlags::AUTO_END);
+ auto now = TAppData::TimeProvider->Now();
StartResolveTime = now;
if (Stats) {
@@ -531,24 +514,12 @@ protected:
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
InternalError(issues);
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
- AbortExecutionAndDie(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
+ AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
} else {
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
}
}
- void HandleTimeout(TEvents::TEvWakeup::TPtr& ev) {
- bool cancel = ev->Get()->Tag == 1;
- LOG_I((cancel ? "CancelAt" : "Timeout") << " exceeded. Send timeout event to the rpc actor " << Target);
- if (cancel) {
- CancelAtActor = {};
- AbortExecutionAndDie(NYql::NDqProto::StatusIds::CANCELLED, "Request timeout exceeded");
- } else {
- DeadlineActor = {};
- AbortExecutionAndDie(NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
- }
- }
-
protected:
void CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates) {
YQL_ENSURE(task.ComputeActorId);
@@ -1016,14 +987,18 @@ protected:
ReplyErrorAndDie(status, &issues);
}
- void AbortExecutionAndDie(NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
+ void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
if (ExecuterSpan) {
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
}
- this->Send(Target, abortEv.Release());
+ // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
+ // If it have come from SessionActor there is no need to send new TEvAbortExecution back
+ if (abortSender != Target) {
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
+ this->Send(Target, abortEv.Release());
+ }
Request.Transactions.crop(0);
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
this->PassAway();
@@ -1112,12 +1087,6 @@ protected:
protected:
void PassAway() override {
LOG_D("terminate execution.");
- if (DeadlineActor) {
- this->Send(DeadlineActor, new TEvents::TEvPoison);
- }
- if (CancelAtActor) {
- this->Send(CancelAtActor, new TEvents::TEvPoison);
- }
if (KqpShardsResolverId) {
this->Send(KqpShardsResolverId, new TEvents::TEvPoison);
}
@@ -1172,9 +1141,7 @@ protected:
std::unique_ptr<TQueryExecutionStats> Stats;
TInstant StartTime;
TMaybe<TInstant> Deadline;
- TActorId DeadlineActor;
TMaybe<TInstant> CancelAt;
- TActorId CancelAtActor;
TActorId Target;
ui64 TxId = 0;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 85542fbcdac..a7743150080 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -103,7 +103,6 @@ public:
hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
- hFunc(TEvents::TEvWakeup, HandleTimeout);
default:
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
}
@@ -121,7 +120,6 @@ private:
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
- hFunc(TEvents::TEvWakeup, HandleTimeout);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
hFunc(TEvPrivate::TEvRetry, HandleRetry);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 55d2a26a505..ee2a2969ab6 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -52,7 +52,7 @@ TString MakeKqpProxyBoardPath(const TString& database) {
static constexpr TDuration DEFAULT_KEEP_ALIVE_TIMEOUT = TDuration::MilliSeconds(5000);
-static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds(10);
+static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds(50);
static constexpr TDuration DEFAULT_CREATE_SESSION_TIMEOUT = TDuration::MilliSeconds(5000);
@@ -115,11 +115,22 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
struct TEvCollectPeerProxyData: public TEventLocal<TEvCollectPeerProxyData, EEv::EvCollectPeerProxyData> {};
struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> {
- public:
- ui64 RequestId;
- TDuration Timeout;
-
- TEvOnRequestTimeout(ui64 requestId, TDuration timeout): RequestId(requestId), Timeout(timeout) {};
+ ui64 RequestId;
+ TDuration Timeout;
+ NYql::NDqProto::StatusIds::StatusCode Status;
+ int Round;
+
+ TEvOnRequestTimeout(ui64 requestId, TDuration timeout, NYql::NDqProto::StatusIds::StatusCode status, int round)
+ : RequestId(requestId)
+ , Timeout(timeout)
+ , Status(status)
+ , Round(round)
+ {}
+
+ void TickNextRound() {
+ ++Round;
+ Timeout = DEFAULT_EXTRA_TIMEOUT_WAIT;
+ }
};
struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {};
@@ -539,13 +550,15 @@ public:
}
}
- // We add extra milliseconds to the user-specified timeout, so it means we give additional priority for worker replies,
- // because it is much better to give detailed error message rather than generic timeout.
- // For example, it helps to avoid race in event order when worker and proxy recieve timeout at the same moment.
- // If worker located in the different datacenter we should better substract some RTT estimate, but at this point it's not done.
+ auto cancelAfter = ev->Get()->GetCancelAfter();
auto timeout = ev->Get()->GetOperationTimeout();
- auto timeoutMs = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig) + DEFAULT_EXTRA_TIMEOUT_WAIT;
- StartQueryTimeout(requestId, timeoutMs);
+ auto timerDuration = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig);
+ if (cancelAfter) {
+ timerDuration = Min(timerDuration, cancelAfter);
+ }
+ KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "TEvQueryRequest, set timer for: " << timerDuration << " timeout: " << timeout << " cancelAfter: " << cancelAfter);
+ auto status = timerDuration == cancelAfter ? NYql::NDqProto::StatusIds::CANCELLED : NYql::NDqProto::StatusIds::TIMEOUT;
+ StartQueryTimeout(requestId, timerDuration, status);
Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId
<< ", targetId: " << targetId << ", sessionId: " << sessionId);
@@ -933,10 +946,10 @@ public:
Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
}
- void StartQueryTimeout(ui64 requestId, TDuration timeout) {
+ void StartQueryTimeout(ui64 requestId, TDuration timeout, NYql::NDqProto::StatusIds::StatusCode status = NYql::NDqProto::StatusIds::TIMEOUT) {
TActorId timeoutTimer = CreateLongTimer(
TlsActivationContext->AsActorContext(), timeout,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvOnRequestTimeout(requestId, timeout))
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvOnRequestTimeout{requestId, timeout, status, 0})
);
KQP_PROXY_LOG_D("Scheduled timeout timer for requestId: " << requestId << " timeout: " << timeout << " actor id: " << timeoutTimer);
@@ -954,7 +967,9 @@ public:
}
void Handle(TEvPrivate::TEvOnRequestTimeout::TPtr& ev) {
+ auto* msg = ev->Get();
ui64 requestId = ev->Get()->RequestId;
+ TimeoutTimers.erase(requestId);
KQP_PROXY_LOG_D("Handle TEvPrivate::TEvOnRequestTimeout(" << requestId << ")");
const TKqpProxyRequest* reqInfo = PendingRequests.FindPtr(requestId);
@@ -963,9 +978,20 @@ public:
return;
}
- TString message = TStringBuilder() << "Query did not complete within specified timeout, session id " << reqInfo->SessionId;
- KQP_PROXY_LOG_D("Reply timeout: requestId " << requestId << " sessionId" << reqInfo->SessionId);
- ReplyProcessError(Ydb::StatusIds::TIMEOUT, message, requestId);
+ KQP_PROXY_LOG_D("Reply timeout: requestId " << requestId << " sessionId: " << reqInfo->SessionId
+ << " status: " << NYql::NDq::DqStatusToYdbStatus(msg->Status) << " round: " << msg->Round);
+
+ const TKqpSessionInfo* info = LocalSessions->FindPtr(reqInfo->SessionId);
+ if (msg->Round == 0 && info) {
+ TString message = TStringBuilder() << "request's " << (msg->Status == NYql::NDqProto::StatusIds::TIMEOUT ? "timeout" : "cancelAfter") << " exceeded";
+ Send(info->WorkerId, new TEvKqp::TEvAbortExecution(msg->Status, message));
+ auto newEv = ev->Release().Release();
+ newEv->TickNextRound();
+ Schedule(newEv->Timeout, newEv);
+ } else {
+ TString message = TStringBuilder() << "Query did not complete within specified timeout, session id " << reqInfo->SessionId;
+ ReplyProcessError(NYql::NDq::DqStatusToYdbStatus(msg->Status), message, requestId);
+ }
}
void Handle(TEvKqp::TEvCloseSessionResponse::TPtr& ev) {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 033ee620faa..23b4b87a2c4 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -645,7 +645,7 @@ public:
QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit(),
QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId());
- RegisterWithSameMailbox(compileRequestActor);
+ CompileActorId = RegisterWithSameMailbox(compileRequestActor);
Become(&TKqpSessionActor::CompileState);
}
@@ -654,14 +654,18 @@ public:
ReplyBusy(ev);
}
- void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) {
- auto compileResult = ev->Get()->CompileResult;
+ void Handle(TEvKqp::TEvCompileResponse::TPtr& ev) {
+ if (ev->Sender != CompileActorId) {
+ return;
+ }
+ CompileActorId = TActorId{};
TTimerGuard timer(this);
- QueryState->Orbit = std::move(ev->Get()->Orbit);
- QueryState->MaxReadType = compileResult->MaxReadType;
+ auto compileResult = ev->Get()->CompileResult;
YQL_ENSURE(compileResult);
YQL_ENSURE(QueryState);
+ QueryState->MaxReadType = compileResult->MaxReadType;
+ QueryState->Orbit = std::move(ev->Get()->Orbit);
LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status);
@@ -1192,7 +1196,7 @@ public:
bool pure = tx && tx->IsPureTx();
auto request = PrepareRequest(tx, pure, QueryState.get());
- LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit
+ LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " pure: " << pure << " commit: " << commit
<< " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size());
if (!CheckTopicOperations()) {
@@ -1293,13 +1297,13 @@ public:
RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterRetriesConfig());
auto exId = RegisterWithSameMailbox(executerActor);
- LOG_D("Created new KQP executer: " << exId);
+ LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
auto ev = std::make_unique<TEvTxUserProxy::TEvProposeKqpTransaction>(exId);
Send(MakeTxProxyID(), ev.release());
if (!isRollback) {
Y_VERIFY(!ExecuterId);
- ExecuterId = exId;
}
+ ExecuterId = exId;
}
@@ -1307,21 +1311,6 @@ public:
void HandleNoop(T&) {
}
- void HandleTxResponse(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
- if (ev->Sender == ExecuterId) {
- auto& response = ev->Get()->Record.GetResponse();
- TIssues issues;
- IssuesFromMessage(response.GetIssues(), issues);
-
- auto err = TStringBuilder() << "Got response from our executor: " << ev->Sender
- << ", Status: " << ev->Get()->Record.GetResponse().GetStatus()
- << ", Issues: " << issues.ToString()
- << " while we are in " << CurrentStateFuncName();
- LOG_E(err);
- YQL_ENSURE(false, "" << err);
- }
- }
-
void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) {
ReplyBusy(ev);
}
@@ -1444,14 +1433,23 @@ public:
TlsActivationContext->Send(ev->Forward(ExecuterId));
}
- void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ void Handle(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
- ExecuterId = TActorId{};
+ if (CompileActorId) {
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded");
+ Send(std::exchange(CompileActorId, {}), abortEv.Release());
+ }
+
+ if (ExecuterId) {
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded");
+ Send(std::exchange(ExecuterId, {}), abortEv.Release());
+ }
const auto& issues = ev->Get()->GetIssues();
- LOG_I("Got TEvAbortExecution, status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()));
- ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "Got AbortExecution", MessageFromIssues(issues));
+ TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
+ LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()));
+ ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
}
void CollectSystemViewQueryStats(const NKqpProto::TKqpStatsQuery* stats, TDuration queryDuration,
@@ -1922,6 +1920,9 @@ public:
}
void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
+ if (ev->Sender != ExecuterId) {
+ return;
+ }
if (QueryState) {
QueryState->Orbit = std::move(ev->Get()->Orbit);
}
@@ -2033,7 +2034,10 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse);
+
+ // forgotten messages from previous aborted request
+ hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
+ hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
default:
UnexpectedEvent("ReadyState", ev);
}
@@ -2048,13 +2052,16 @@ public:
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqp::TEvQueryRequest, HandleCompile);
- hFunc(TEvKqp::TEvCompileResponse, HandleCompile);
+ hFunc(TEvKqp::TEvCompileResponse, Handle);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse);
hFunc(NGRpcService::TEvClientLost, HandleClientLost);
+
+ // forgotten messages from previous aborted request
+ hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
default:
UnexpectedEvent("CompileState", ev);
}
@@ -2075,7 +2082,7 @@ public:
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleExecute);
- hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
@@ -2083,6 +2090,9 @@ public:
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(NGRpcService::TEvClientLost, HandleClientLost);
+ // forgotten messages from previous aborted request
+ hFunc(TEvKqp::TEvCompileResponse, Handle);
+
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
default:
@@ -2102,13 +2112,16 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleCleanup);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup);
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);
- hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(NGRpcService::TEvClientLost, HandleNoop);
+ // forgotten messages from previous aborted request
+ hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
+
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
default:
@@ -2282,6 +2295,7 @@ private:
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TKqpSettings::TConstPtr KqpSettings;
std::optional<TActorId> WorkerId;
+ TActorId CompileActorId;
TActorId ExecuterId;
std::shared_ptr<TKqpQueryState> QueryState;
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index 89f9d7323f3..67d835b83eb 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -387,26 +387,32 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}));
}
- Y_UNIT_TEST(QueryExecCancel) {
+ Y_UNIT_TEST(QueryExecTimeoutCancel) {
TKikimrRunner kikimr;
CreateLargeTable(kikimr, 500000, 10, 100, 5000, 1);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
- auto prepareResult = session.PrepareDataQuery(Q_(R"(
- SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "11111";
- )")).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
- auto dataQuery = prepareResult.GetQuery();
-
- auto settings = TExecDataQuerySettings()
- .CancelAfter(TDuration::MilliSeconds(100));
+ for (auto status : {EStatus::TIMEOUT, EStatus::CANCELLED}) {
+ auto prepareResult = session.PrepareDataQuery(Q_(R"(
+ SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "11111";
+ )")).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
+ auto dataQuery = prepareResult.GetQuery();
+
+ auto settings = TExecDataQuerySettings();
+ if (status == EStatus::TIMEOUT) {
+ settings.OperationTimeout(TDuration::MilliSeconds(100));
+ } else {
+ settings.CancelAfter(TDuration::MilliSeconds(100));
+ }
- auto result = dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync();
+ auto result = dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync();
- result.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CANCELLED);
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
+ }
}
Y_UNIT_TEST(QueryExecTimeout) {
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 400361d6fe4..23e4d780639 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -2059,7 +2059,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, StreamLookup
TString tmpSessionId = CreateSessionRPC(runtime);
auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 3;"), tmpSessionId, "", true);
req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1);
- req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1);
auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req)));
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT);
}
@@ -2085,7 +2084,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, StreamLookup
TString tmpSessionId = CreateSessionRPC(runtime);
auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 3;"), tmpSessionId, "", true);
req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1);
- req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1);
auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req)));
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT);
}
@@ -2229,7 +2227,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, StreamLookup)
auto sender4 = CreateSessionRPC(runtime);
auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender4, "", true);
req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1);
- req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1);
auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req)));
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT);
}
@@ -2270,7 +2267,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, StreamLookup)
auto sender5 = CreateSessionRPC(runtime);
auto req = MakeSimpleRequestRPC(Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 7;"), sender5, "", true);
req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(1);
- req.mutable_operation_params()->mutable_cancel_after()->set_seconds(1);
auto response = AwaitResponse(runtime, SendRequest(runtime, std::move(req)));
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::TIMEOUT);
}