diff options
author | gvit <gvit@ydb.tech> | 2023-09-04 17:35:07 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-09-04 18:06:02 +0300 |
commit | 6150a5181282cea17949009ca6517b609c29c6f7 (patch) | |
tree | ba1dafa75acc4f3766c3d0cf3cbbe22975754df4 | |
parent | e66c95c6ac3626a427da3f53707135d6e1f6dc82 (diff) | |
download | ydb-6150a5181282cea17949009ca6517b609c29c6f7.tar.gz |
implement compilation cancellation, add logs to tests, trying to fix test KIKIMR-19237
-rw-r--r-- | ydb/core/kqp/common/compilation/events.h | 14 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_service_ut.cpp | 11 |
6 files changed, 61 insertions, 19 deletions
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index e36b103707b..6af53925d02 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -13,7 +13,7 @@ namespace NKikimr::NKqp::NPrivateEvents { struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> { TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, TMaybe<TKqpQueryId>&& query, bool keepInCache, TInstant deadline, - TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, + TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) : UserToken(userToken) , Uid(uid) @@ -22,7 +22,9 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo , Deadline(deadline) , DbCounters(dbCounters) , Orbit(std::move(orbit)) - , TempTablesState(std::move(tempTablesState)) { + , TempTablesState(std::move(tempTablesState)) + , IntrestedInResult(std::move(intrestedInResult)) + { Y_ENSURE(Uid.Defined() != Query.Defined()); } @@ -38,12 +40,13 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo NLWTrace::TOrbit Orbit; TKqpTempTablesState::TConstPtr TempTablesState; + std::shared_ptr<std::atomic<bool>> IntrestedInResult; }; struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> { TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid, const TMaybe<TKqpQueryId>& query, TInstant deadline, - TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, + TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) : UserToken(userToken) , Uid(uid) @@ -51,7 +54,9 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents:: , Deadline(deadline) , DbCounters(dbCounters) , Orbit(std::move(orbit)) - , TempTablesState(std::move(tempTablesState)) { + , TempTablesState(std::move(tempTablesState)) + , IntrestedInResult(std::move(intrestedInResult)) + { } TIntrusiveConstPtr<NACLib::TUserToken> UserToken; @@ -64,6 +69,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents:: NLWTrace::TOrbit Orbit; TKqpTempTablesState::TConstPtr TempTablesState; + std::shared_ptr<std::atomic<bool>> IntrestedInResult; }; struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> { diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 7bd700c4a86..24761a849a4 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -178,7 +178,8 @@ private: struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, - ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}, + ui64 cookie, std::shared_ptr<std::atomic<bool>> intrestedInResult, + NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}, TKqpTempTablesState::TConstPtr tempTablesState = {}) : Sender(sender) , Query(std::move(query)) @@ -191,6 +192,7 @@ struct TKqpCompileRequest { , CompileServiceSpan(std::move(span)) , Cookie(cookie) , TempTablesState(std::move(tempTablesState)) + , IntrestedInResult(std::move(intrestedInResult)) {} TActorId Sender; @@ -206,6 +208,11 @@ struct TKqpCompileRequest { NWilson::TSpan CompileServiceSpan; ui64 Cookie; TKqpTempTablesState::TConstPtr TempTablesState; + std::shared_ptr<std::atomic<bool>> IntrestedInResult; + + bool IsIntrestedInResult() const { + return IntrestedInResult->load(); + } }; class TKqpRequestsQueue { @@ -236,13 +243,26 @@ public: } TMaybe<TKqpCompileRequest> Dequeue() { - for (auto it = Queue.begin(); it != Queue.end(); ++it) { + auto it = Queue.begin(); + + while (it != Queue.end()) { auto& request = *it; + auto curIt = it++; + + if (!request.IsIntrestedInResult()) { + auto result = std::move(request); + LOG_DEBUG(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, + "Drop compilation request because session is not longer wait for response"); + QueryIndex[result.Query].erase(curIt); + Queue.erase(curIt); + continue; + } + if (!ActiveRequests.contains(request.Query)) { auto result = std::move(request); - QueryIndex[result.Query].erase(it); - Queue.erase(it); + QueryIndex[result.Query].erase(curIt); + Queue.erase(curIt); return result; } @@ -531,7 +551,7 @@ private: TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), request.KeepInCache, request.UserToken, request.Deadline, dbCounters, - ev->Cookie, + ev->Cookie, std::move(ev->Get()->IntrestedInResult), std::move(ev->Get()->Orbit), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -581,7 +601,7 @@ private: TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, true, request.UserToken, request.Deadline, dbCounters, - ev->Cookie, + ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState)); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index ccd842f5048..8bc47fa74f4 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -120,7 +120,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) { return true; } -std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() { +std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) { TMaybe<TKqpQueryId> query; TMaybe<TString> uid; @@ -161,10 +161,10 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() } return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, uid, - std::move(query), keepInCache, compileDeadline, DbCounters, std::move(Orbit), TempTablesState); + std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState); } -std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest() { +std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) { YQL_ENSURE(CompileResult); TMaybe<TKqpQueryId> query; TMaybe<TString> uid; @@ -197,7 +197,7 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque } return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, - CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit), TempTablesState); + CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState); } void TKqpQueryState::AddOffsetsToTransaction() { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 21bdaafaaff..36c61a07588 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -354,10 +354,10 @@ public: // same the context of the compiled query to the query state. bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev); // build the compilation request. - std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest(); + std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie); // TODO(gvit): get rid of code duplication in these requests, // use only one of these requests. - std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest(); + std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie); const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const { return RequestEv->GetYdbParameters(); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index d0705e0132a..ad621575501 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -170,6 +170,7 @@ public: RequestCounters->Counters = Counters; RequestCounters->DbCounters = Settings.DbCounters; RequestCounters->TxProxyMon = MakeIntrusive<NTxProxy::TTxProxyMon>(AppData()->Counters); + CompilationCookie = std::make_shared<std::atomic<bool>>(true); FillSettings.AllResultsBytesLimit = Nothing(); FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef(); @@ -459,7 +460,7 @@ public: void CompileQuery() { YQL_ENSURE(QueryState); - auto ev = QueryState->BuildCompileRequest(); + auto ev = QueryState->BuildCompileRequest(CompilationCookie); LOG_D("Sending CompileQuery request"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); Become(&TKqpSessionActor::CompileState); @@ -479,7 +480,7 @@ public: // table versions are not the same. need the query recompilation. if (!QueryState->EnsureTableVersions(*response)) { - auto ev = QueryState->BuildReCompileRequest(); + auto ev = QueryState->BuildReCompileRequest(CompilationCookie); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); return; } @@ -1708,6 +1709,11 @@ public: Counters->ReportSessionActorClosedRequest(Settings.DbCounters); if (isFinal) { + // no longer intrested in any compilation responses + CompilationCookie->store(false); + } + + if (isFinal) { Transactions.FinalCleanup(); Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize()); } @@ -2151,6 +2157,7 @@ private: TKqpTempTablesState TempTablesState; NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; + std::shared_ptr<std::atomic<bool>> CompilationCookie; }; } // namespace diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp index 4eed1d3124f..dd462b00e4b 100644 --- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp @@ -67,6 +67,11 @@ Y_UNIT_TEST_SUITE(KqpService) { Y_UNIT_TEST(CloseSessionsWithLoad) { auto kikimr = std::make_shared<TKikimrRunner>(); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG); + auto db = kikimr->GetTableClient(); const ui32 SessionsCount = 50; @@ -84,11 +89,13 @@ Y_UNIT_TEST_SUITE(KqpService) { NPar::LocalExecutor().ExecRange([kikimr, sessions, WaitDuration](int id) mutable { if (id == (i32)sessions.size()) { Sleep(WaitDuration); - + Cerr << "start sessions close....." << Endl; for (ui32 i = 0; i < sessions.size(); ++i) { sessions[i].Close(); } + Cerr << "finished sessions close....." << Endl; + return; } @@ -120,6 +127,8 @@ Y_UNIT_TEST_SUITE(KqpService) { auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx()).GetValueSync(); if (!result.IsSuccess()) { + Sleep(TDuration::Seconds(5)); + Cerr << "received non-success status for session " << id << Endl; return; } |