diff options
author | dcherednik <dcherednik@ydb.tech> | 2022-12-28 18:00:52 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2022-12-28 18:00:52 +0300 |
commit | 5a46fb31349535debfce63d3f6d080cf4f91821d (patch) | |
tree | 53211535193f07e79bdc18b33fa18e7296aa73ab | |
parent | 29d63e5c12b976eb2070e1013f8a7ca4f9d7b1af (diff) | |
download | ydb-5a46fb31349535debfce63d3f6d080cf4f91821d.tar.gz |
Pass client lost notification to executor.
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 47 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_query_ut.cpp | 121 |
3 files changed, 120 insertions, 60 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index db861f006d..ffa2e874a1 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -152,13 +152,13 @@ public: void CheckExecutionComplete() { ui32 notFinished = 0; - for (auto& x : ShardStates) { + for (const auto& x : ShardStates) { if (x.second.State != TShardState::EState::Finished) { notFinished++; LOG_D("Datashard " << x.first << " not finished yet: " << ToString(x.second.State)); } } - for (auto& x : TopicTabletStates) { + for (const auto& x : TopicTabletStates) { if (x.second.State != TShardState::EState::Finished) { ++notFinished; LOG_D("TopicTablet " << x.first << " not finished yet: " << ToString(x.second.State)); @@ -171,15 +171,15 @@ public: if (IsDebugLogEnabled()) { auto sb = TStringBuilder() << "Waiting for " << PendingComputeActors.size() << " compute actor(s) and " << notFinished << " datashard(s): "; - for (auto shardId : PendingComputeActors) { + for (const auto& shardId : PendingComputeActors) { sb << "CA " << shardId.first << ", "; } - for (auto& [shardId, shardState] : ShardStates) { + for (const auto& [shardId, shardState] : ShardStates) { if (shardState.State != TShardState::EState::Finished) { sb << "DS " << shardId << " (" << ToString(shardState.State) << "), "; } } - for (auto& [tabletId, tabletState] : TopicTabletStates) { + for (const auto& [tabletId, tabletState] : TopicTabletStates) { if (tabletState.State != TShardState::EState::Finished) { sb << "PQ " << tabletId << " (" << ToString(tabletState.State) << "), "; } @@ -642,7 +642,7 @@ private: } void CheckPrepareCompleted() { - for (auto& [_, state] : ShardStates) { + for (const auto& [_, state] : ShardStates) { if (state.State != TShardState::EState::Prepared) { LOG_D("Not all shards are prepared, waiting..."); return; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index cf715513f1..0d1716f289 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -253,7 +253,8 @@ public: TString LogPrefix() const { TStringBuilder result = TStringBuilder() << "SessionId: " << SessionId << ", " - << "ActorId: " << SelfId() << ", "; + << "ActorId: " << SelfId() << ", " + << "ActorState: " << CurrentStateFuncName() << ", "; if (Y_LIKELY(QueryState)) { result << "TraceId: " << QueryState->TraceId << ", "; } @@ -392,6 +393,14 @@ public: void HandleWakeup(TEvents::TEvWakeup::TPtr &ev) { switch ((EWakeupTag) ev->Get()->Tag) { case EWakeupTag::ClientLost: + LOG_D("Got TEvWakeup ClientLost event, send AbortExecution to executor: " + << ExecuterId); + + if (ExecuterId) { + auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here + + Send(ExecuterId, abortEv.Release()); + } Cleanup(); break; default: @@ -1278,6 +1287,21 @@ public: void HandleNoop(T&) { } + void HandleTxResponse(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { + if (ev->Sender == ExecuterId) { + auto& response = ev->Get()->Record.GetResponse(); + TIssues issues; + IssuesFromMessage(response.GetIssues(), issues); + + auto err = TStringBuilder() << "Got response from our executor: " << ev->Sender + << ", Status: " << ev->Get()->Record.GetResponse().GetStatus() + << ", Issues: " << issues.ToString() + << " while we are in " << CurrentStateFuncName(); + LOG_E(err); + YQL_ENSURE(false, "" << err); + } + } + void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) { ReplyBusy(ev); } @@ -2014,6 +2038,7 @@ public: StartIdleTimer(); Become(&TKqpSessionActor::ReadyState); } + ExecuterId = TActorId{}; } template<class T> @@ -2070,6 +2095,7 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); + hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse); default: UnexpectedEvent("ReadyState", ev); } @@ -2091,6 +2117,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); + hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse); hFunc(TEvents::TEvWakeup, HandleWakeup); default: UnexpectedEvent("CompileState", ev); @@ -2183,6 +2210,24 @@ public: } private: + + TString CurrentStateFuncName() const { + const auto& func = CurrentStateFunc(); + if (func == &TThis::ReadyState) { + return "ReadyState"; + } else if (func == &TThis::ExecuteState) { + return "ExecuteState"; + } else if (func == &TThis::TopicOpsState) { + return "TopicOpsState"; + } else if (func == &TThis::CompileState) { + return "CompileState"; + } else if (func == &TThis::CleanupState) { + return "CleanupState"; + } else { + return "unknown state"; + } + } + void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) { InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " recieve unexpected event " << TypeName(*ev.Get()->GetBase()) << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index b971cc1db6..91c1f4b7ba 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -17,6 +17,22 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; +static void CheckStatusAfterTimeout(TSession& session, const TString& query, const TTxControl& txControl) { + const TInstant start = TInstant::Now(); + while (true) { + auto result = session.ExecuteDataQuery(query, txControl).ExtractValueSync(); + Cerr << "STATUS: " << result.GetStatus() << " " << result.GetIssues().ToString() << Endl; + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString()); + if (result.GetStatus() == EStatus::SUCCESS) { + break; + } + + UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after client lost"); + // Do not fire too much CPU + Sleep(TDuration::MilliSeconds(10)); + } +} + Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(PreparedQueryInvalidate) { TKikimrRunner kikimr; @@ -285,71 +301,70 @@ Y_UNIT_TEST_SUITE(KqpQuery) { } Y_UNIT_TEST(QueryClientTimeout) { - TStringStream logStream; + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); - { - TKikimrRunner kikimr( - TKikimrSettings() - .SetLogStream(&logStream)); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); + auto query = Q_(R"( + SELECT * FROM `/Root/TwoShard`; + )"); - auto query = Q_(R"( - SELECT * FROM `/Root/TwoShard`; - )"); + auto txControl = TTxControl::BeginTx().CommitTx(); - auto txControl = TTxControl::BeginTx().CommitTx(); + NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1); - NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1); + auto result = session.ExecuteDataQuery( + query, + txControl, + TExecDataQuerySettings() + .UseClientTimeoutForOperation(false) + .ClientTimeout(TDuration::Seconds(3)) + ).ExtractValueSync(); - auto result = session.ExecuteDataQuery( - query, - txControl, - TExecDataQuerySettings() - .UseClientTimeoutForOperation(false) - .ClientTimeout(TDuration::Seconds(3)) - ).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED); - result.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED); + NDataShard::gSkipRepliesFailPoint.Disable(); - NDataShard::gSkipRepliesFailPoint.Disable(); + CheckStatusAfterTimeout(session, query, txControl); + } - const TInstant start = TInstant::Now(); - while (true) { - result = session.ExecuteDataQuery(query, txControl).ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString()); - if (result.GetStatus() == EStatus::SUCCESS) { - break; - } + Y_UNIT_TEST(QueryClientTimeoutPrecompiled) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); - UNIT_ASSERT_C(TInstant::Now() - start < TDuration::Seconds(30), "Unable to cancel processing after client lost"); - // Do not fire too much CPU - Sleep(TDuration::MilliSeconds(10)); - } - } + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); - TString line; - int v = 0; - while (logStream.ReadLine(line)) { - if (line.find("ExecuteDataQuery") == line.npos) - continue; - const TString pattern("timeout# "); - auto start = line.find(pattern); - if (start == line.npos) - continue; - // We just want integer part - auto end = line.find('.', start + pattern.size()); - UNIT_ASSERT(end > start + pattern.size()); //something wrong with string - UNIT_ASSERT(end != line.npos); - TString val = line.substr(start + pattern.size(), end - start - pattern.size()); - v = std::stoi(val); - } + auto query = Q_(R"( + SELECT * FROM `/Root/TwoShard`; + )"); + + auto prepareResult = session.PrepareDataQuery( + query + ).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(prepareResult.GetStatus(), EStatus::SUCCESS); + + auto txControl = TTxControl::BeginTx().CommitTx(); + + NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1); + + auto result = prepareResult.GetQuery().Execute( + txControl, + TExecDataQuerySettings() + .UseClientTimeoutForOperation(false) + .ClientTimeout(TDuration::Seconds(3)) + ).ExtractValueSync(); + + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::CLIENT_DEADLINE_EXCEEDED); + + NDataShard::gSkipRepliesFailPoint.Disable(); - UNIT_ASSERT(v); - UNIT_ASSERT_C(v <= 3, "got " << v); + CheckStatusAfterTimeout(session, query, txControl); } Y_UNIT_TEST(QueryCancel) { |