aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2022-12-28 18:00:52 +0300
committerdcherednik <dcherednik@ydb.tech>2022-12-28 18:00:52 +0300
commit5a46fb31349535debfce63d3f6d080cf4f91821d (patch)
tree53211535193f07e79bdc18b33fa18e7296aa73ab
parent29d63e5c12b976eb2070e1013f8a7ca4f9d7b1af (diff)
downloadydb-5a46fb31349535debfce63d3f6d080cf4f91821d.tar.gz
Pass client lost notification to executor.
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp12
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp47
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp121
3 files changed, 120 insertions, 60 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index db861f006d..ffa2e874a1 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -152,13 +152,13 @@ public:
void CheckExecutionComplete() {
ui32 notFinished = 0;
- for (auto& x : ShardStates) {
+ for (const auto& x : ShardStates) {
if (x.second.State != TShardState::EState::Finished) {
notFinished++;
LOG_D("Datashard " << x.first << " not finished yet: " << ToString(x.second.State));
}
}
- for (auto& x : TopicTabletStates) {
+ for (const auto& x : TopicTabletStates) {
if (x.second.State != TShardState::EState::Finished) {
++notFinished;
LOG_D("TopicTablet " << x.first << " not finished yet: " << ToString(x.second.State));
@@ -171,15 +171,15 @@ public:
if (IsDebugLogEnabled()) {
auto sb = TStringBuilder() << "Waiting for " << PendingComputeActors.size() << " compute actor(s) and "
<< notFinished << " datashard(s): ";
- for (auto shardId : PendingComputeActors) {
+ for (const auto& shardId : PendingComputeActors) {
sb << "CA " << shardId.first << ", ";
}
- for (auto& [shardId, shardState] : ShardStates) {
+ for (const auto& [shardId, shardState] : ShardStates) {
if (shardState.State != TShardState::EState::Finished) {
sb << "DS " << shardId << " (" << ToString(shardState.State) << "), ";
}
}
- for (auto& [tabletId, tabletState] : TopicTabletStates) {
+ for (const auto& [tabletId, tabletState] : TopicTabletStates) {
if (tabletState.State != TShardState::EState::Finished) {
sb << "PQ " << tabletId << " (" << ToString(tabletState.State) << "), ";
}
@@ -642,7 +642,7 @@ private:
}
void CheckPrepareCompleted() {
- for (auto& [_, state] : ShardStates) {
+ for (const auto& [_, state] : ShardStates) {
if (state.State != TShardState::EState::Prepared) {
LOG_D("Not all shards are prepared, waiting...");
return;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index cf715513f1..0d1716f289 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -253,7 +253,8 @@ public:
TString LogPrefix() const {
TStringBuilder result = TStringBuilder()
<< "SessionId: " << SessionId << ", "
- << "ActorId: " << SelfId() << ", ";
+ << "ActorId: " << SelfId() << ", "
+ << "ActorState: " << CurrentStateFuncName() << ", ";
if (Y_LIKELY(QueryState)) {
result << "TraceId: " << QueryState->TraceId << ", ";
}
@@ -392,6 +393,14 @@ public:
void HandleWakeup(TEvents::TEvWakeup::TPtr &ev) {
switch ((EWakeupTag) ev->Get()->Tag) {
case EWakeupTag::ClientLost:
+ LOG_D("Got TEvWakeup ClientLost event, send AbortExecution to executor: "
+ << ExecuterId);
+
+ if (ExecuterId) {
+ auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here
+
+ Send(ExecuterId, abortEv.Release());
+ }
Cleanup();
break;
default:
@@ -1278,6 +1287,21 @@ public:
void HandleNoop(T&) {
}
+ void HandleTxResponse(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
+ if (ev->Sender == ExecuterId) {
+ auto& response = ev->Get()->Record.GetResponse();
+ TIssues issues;
+ IssuesFromMessage(response.GetIssues(), issues);
+
+ auto err = TStringBuilder() << "Got response from our executor: " << ev->Sender
+ << ", Status: " << ev->Get()->Record.GetResponse().GetStatus()
+ << ", Issues: " << issues.ToString()
+ << " while we are in " << CurrentStateFuncName();
+ LOG_E(err);
+ YQL_ENSURE(false, "" << err);
+ }
+ }
+
void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) {
ReplyBusy(ev);
}
@@ -2014,6 +2038,7 @@ public:
StartIdleTimer();
Become(&TKqpSessionActor::ReadyState);
}
+ ExecuterId = TActorId{};
}
template<class T>
@@ -2070,6 +2095,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse);
default:
UnexpectedEvent("ReadyState", ev);
}
@@ -2091,6 +2117,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
+ hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse);
hFunc(TEvents::TEvWakeup, HandleWakeup);
default:
UnexpectedEvent("CompileState", ev);
@@ -2183,6 +2210,24 @@ public:
}
private:
+
+ TString CurrentStateFuncName() const {
+ const auto& func = CurrentStateFunc();
+ if (func == &TThis::ReadyState) {
+ return "ReadyState";
+ } else if (func == &TThis::ExecuteState) {
+ return "ExecuteState";
+ } else if (func == &TThis::TopicOpsState) {
+ return "TopicOpsState";
+ } else if (func == &TThis::CompileState) {
+ return "CompileState";
+ } else if (func == &TThis::CleanupState) {
+ return "CleanupState";
+ } else {
+ return "unknown state";
+ }
+ }
+
void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " recieve unexpected event " <<
TypeName(*ev.Get()->GetBase()) << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index b971cc1db6..91c1f4b7ba 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -17,6 +17,22 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;
+static void CheckStatusAfterTimeout(TSession& session, const TString& query, const TTxControl& txControl) {
+ const TInstant start = TInstant::Now();
+ while (true) {
+ auto result = session.ExecuteDataQuery(query, txControl).ExtractValueSync();
+ Cerr << "STATUS: " << result.GetStatus() << " " << result.GetIssues().ToString() << Endl;
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString());
+ if (result.GetStatus() == EStatus::SUCCESS) {
+ break;
+ }
+
+ UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after client lost");
+ // Do not fire too much CPU
+ Sleep(TDuration::MilliSeconds(10));
+ }
+}
+
Y_UNIT_TEST_SUITE(KqpQuery) {
Y_UNIT_TEST(PreparedQueryInvalidate) {
TKikimrRunner kikimr;
@@ -285,71 +301,70 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
}
Y_UNIT_TEST(QueryClientTimeout) {
- TStringStream logStream;
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
- {
- TKikimrRunner kikimr(
- TKikimrSettings()
- .SetLogStream(&logStream));
- auto db = kikimr.GetTableClient();
- auto session = db.CreateSession().GetValueSync().GetSession();
+ kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
+ auto query = Q_(R"(
+ SELECT * FROM `/Root/TwoShard`;
+ )");
- auto query = Q_(R"(
- SELECT * FROM `/Root/TwoShard`;
- )");
+ auto txControl = TTxControl::BeginTx().CommitTx();
- auto txControl = TTxControl::BeginTx().CommitTx();
+ NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1);
- NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1);
+ auto result = session.ExecuteDataQuery(
+ query,
+ txControl,
+ TExecDataQuerySettings()
+ .UseClientTimeoutForOperation(false)
+ .ClientTimeout(TDuration::Seconds(3))
+ ).ExtractValueSync();
- auto result = session.ExecuteDataQuery(
- query,
- txControl,
- TExecDataQuerySettings()
- .UseClientTimeoutForOperation(false)
- .ClientTimeout(TDuration::Seconds(3))
- ).ExtractValueSync();
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED);
- result.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED);
+ NDataShard::gSkipRepliesFailPoint.Disable();
- NDataShard::gSkipRepliesFailPoint.Disable();
+ CheckStatusAfterTimeout(session, query, txControl);
+ }
- const TInstant start = TInstant::Now();
- while (true) {
- result = session.ExecuteDataQuery(query, txControl).ExtractValueSync();
- UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString());
- if (result.GetStatus() == EStatus::SUCCESS) {
- break;
- }
+ Y_UNIT_TEST(QueryClientTimeoutPrecompiled) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
- UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after client lost");
- // Do not fire too much CPU
- Sleep(TDuration::MilliSeconds(10));
- }
- }
+ kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
- TString line;
- int v = 0;
- while (logStream.ReadLine(line)) {
- if (line.find("ExecuteDataQuery") == line.npos)
- continue;
- const TString pattern("timeout# ");
- auto start = line.find(pattern);
- if (start == line.npos)
- continue;
- // We just want integer part
- auto end = line.find('.', start + pattern.size());
- UNIT_ASSERT(end > start + pattern.size()); //something wrong with string
- UNIT_ASSERT(end != line.npos);
- TString val = line.substr(start + pattern.size(), end - start - pattern.size());
- v = std::stoi(val);
- }
+ auto query = Q_(R"(
+ SELECT * FROM `/Root/TwoShard`;
+ )");
+
+ auto prepareResult = session.PrepareDataQuery(
+ query
+ ).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(prepareResult.GetStatus(), EStatus::SUCCESS);
+
+ auto txControl = TTxControl::BeginTx().CommitTx();
+
+ NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1);
+
+ auto result = prepareResult.GetQuery().Execute(
+ txControl,
+ TExecDataQuerySettings()
+ .UseClientTimeoutForOperation(false)
+ .ClientTimeout(TDuration::Seconds(3))
+ ).ExtractValueSync();
+
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED);
+
+ NDataShard::gSkipRepliesFailPoint.Disable();
- UNIT_ASSERT(v);
- UNIT_ASSERT_C(v <= 3, "got " << v);
+ CheckStatusAfterTimeout(session, query, txControl);
}
Y_UNIT_TEST(QueryCancel) {