aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-09-04 17:35:07 +0300
committergvit <gvit@ydb.tech>2023-09-04 18:06:02 +0300
commit6150a5181282cea17949009ca6517b609c29c6f7 (patch)
treeba1dafa75acc4f3766c3d0cf3cbbe22975754df4
parente66c95c6ac3626a427da3f53707135d6e1f6dc82 (diff)
downloadydb-6150a5181282cea17949009ca6517b609c29c6f7.tar.gz
implement compilation cancellation, add logs to tests, trying to fix test KIKIMR-19237
-rw-r--r--ydb/core/kqp/common/compilation/events.h14
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp32
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.cpp8
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h4
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp11
-rw-r--r--ydb/core/kqp/ut/service/kqp_service_ut.cpp11
6 files changed, 61 insertions, 19 deletions
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h
index e36b103707b..6af53925d02 100644
--- a/ydb/core/kqp/common/compilation/events.h
+++ b/ydb/core/kqp/common/compilation/events.h
@@ -13,7 +13,7 @@ namespace NKikimr::NKqp::NPrivateEvents {
struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, TInstant deadline,
- TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {},
+ TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
: UserToken(userToken)
, Uid(uid)
@@ -22,7 +22,9 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
, Deadline(deadline)
, DbCounters(dbCounters)
, Orbit(std::move(orbit))
- , TempTablesState(std::move(tempTablesState)) {
+ , TempTablesState(std::move(tempTablesState))
+ , IntrestedInResult(std::move(intrestedInResult))
+ {
Y_ENSURE(Uid.Defined() != Query.Defined());
}
@@ -38,12 +40,13 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
NLWTrace::TOrbit Orbit;
TKqpTempTablesState::TConstPtr TempTablesState;
+ std::shared_ptr<std::atomic<bool>> IntrestedInResult;
};
struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid,
const TMaybe<TKqpQueryId>& query, TInstant deadline,
- TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {},
+ TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
: UserToken(userToken)
, Uid(uid)
@@ -51,7 +54,9 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, Deadline(deadline)
, DbCounters(dbCounters)
, Orbit(std::move(orbit))
- , TempTablesState(std::move(tempTablesState)) {
+ , TempTablesState(std::move(tempTablesState))
+ , IntrestedInResult(std::move(intrestedInResult))
+ {
}
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
@@ -64,6 +69,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
NLWTrace::TOrbit Orbit;
TKqpTempTablesState::TConstPtr TempTablesState;
+ std::shared_ptr<std::atomic<bool>> IntrestedInResult;
};
struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 7bd700c4a86..24761a849a4 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -178,7 +178,8 @@ private:
struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
- ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {},
+ ui64 cookie, std::shared_ptr<std::atomic<bool>> intrestedInResult,
+ NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {},
TKqpTempTablesState::TConstPtr tempTablesState = {})
: Sender(sender)
, Query(std::move(query))
@@ -191,6 +192,7 @@ struct TKqpCompileRequest {
, CompileServiceSpan(std::move(span))
, Cookie(cookie)
, TempTablesState(std::move(tempTablesState))
+ , IntrestedInResult(std::move(intrestedInResult))
{}
TActorId Sender;
@@ -206,6 +208,11 @@ struct TKqpCompileRequest {
NWilson::TSpan CompileServiceSpan;
ui64 Cookie;
TKqpTempTablesState::TConstPtr TempTablesState;
+ std::shared_ptr<std::atomic<bool>> IntrestedInResult;
+
+ bool IsIntrestedInResult() const {
+ return IntrestedInResult->load();
+ }
};
class TKqpRequestsQueue {
@@ -236,13 +243,26 @@ public:
}
TMaybe<TKqpCompileRequest> Dequeue() {
- for (auto it = Queue.begin(); it != Queue.end(); ++it) {
+ auto it = Queue.begin();
+
+ while (it != Queue.end()) {
auto& request = *it;
+ auto curIt = it++;
+
+ if (!request.IsIntrestedInResult()) {
+ auto result = std::move(request);
+ LOG_DEBUG(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
+ "Drop compilation request because session is not longer wait for response");
+ QueryIndex[result.Query].erase(curIt);
+ Queue.erase(curIt);
+ continue;
+ }
+
if (!ActiveRequests.contains(request.Query)) {
auto result = std::move(request);
- QueryIndex[result.Query].erase(it);
- Queue.erase(it);
+ QueryIndex[result.Query].erase(curIt);
+ Queue.erase(curIt);
return result;
}
@@ -531,7 +551,7 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
- ev->Cookie,
+ ev->Cookie, std::move(ev->Get()->IntrestedInResult),
std::move(ev->Get()->Orbit), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
@@ -581,7 +601,7 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
true, request.UserToken, request.Deadline, dbCounters,
- ev->Cookie,
+ ev->Cookie, std::move(ev->Get()->IntrestedInResult),
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState));
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
index ccd842f5048..8bc47fa74f4 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -120,7 +120,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
return true;
}
-std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() {
+std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) {
TMaybe<TKqpQueryId> query;
TMaybe<TString> uid;
@@ -161,10 +161,10 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest()
}
return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, uid,
- std::move(query), keepInCache, compileDeadline, DbCounters, std::move(Orbit), TempTablesState);
+ std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState);
}
-std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest() {
+std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) {
YQL_ENSURE(CompileResult);
TMaybe<TKqpQueryId> query;
TMaybe<TString> uid;
@@ -197,7 +197,7 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
}
return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid,
- CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit), TempTablesState);
+ CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState);
}
void TKqpQueryState::AddOffsetsToTransaction() {
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 21bdaafaaff..36c61a07588 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -354,10 +354,10 @@ public:
// same the context of the compiled query to the query state.
bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev);
// build the compilation request.
- std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest();
+ std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie);
// TODO(gvit): get rid of code duplication in these requests,
// use only one of these requests.
- std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest();
+ std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie);
const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
return RequestEv->GetYdbParameters();
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index d0705e0132a..ad621575501 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -170,6 +170,7 @@ public:
RequestCounters->Counters = Counters;
RequestCounters->DbCounters = Settings.DbCounters;
RequestCounters->TxProxyMon = MakeIntrusive<NTxProxy::TTxProxyMon>(AppData()->Counters);
+ CompilationCookie = std::make_shared<std::atomic<bool>>(true);
FillSettings.AllResultsBytesLimit = Nothing();
FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef();
@@ -459,7 +460,7 @@ public:
void CompileQuery() {
YQL_ENSURE(QueryState);
- auto ev = QueryState->BuildCompileRequest();
+ auto ev = QueryState->BuildCompileRequest(CompilationCookie);
LOG_D("Sending CompileQuery request");
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
Become(&TKqpSessionActor::CompileState);
@@ -479,7 +480,7 @@ public:
// table versions are not the same. need the query recompilation.
if (!QueryState->EnsureTableVersions(*response)) {
- auto ev = QueryState->BuildReCompileRequest();
+ auto ev = QueryState->BuildReCompileRequest(CompilationCookie);
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
return;
}
@@ -1708,6 +1709,11 @@ public:
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
if (isFinal) {
+ // no longer intrested in any compilation responses
+ CompilationCookie->store(false);
+ }
+
+ if (isFinal) {
Transactions.FinalCleanup();
Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize());
}
@@ -2151,6 +2157,7 @@ private:
TKqpTempTablesState TempTablesState;
NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
+ std::shared_ptr<std::atomic<bool>> CompilationCookie;
};
} // namespace
diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
index 4eed1d3124f..dd462b00e4b 100644
--- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp
@@ -67,6 +67,11 @@ Y_UNIT_TEST_SUITE(KqpService) {
Y_UNIT_TEST(CloseSessionsWithLoad) {
auto kikimr = std::make_shared<TKikimrRunner>();
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
+ kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);
+
auto db = kikimr->GetTableClient();
const ui32 SessionsCount = 50;
@@ -84,11 +89,13 @@ Y_UNIT_TEST_SUITE(KqpService) {
NPar::LocalExecutor().ExecRange([kikimr, sessions, WaitDuration](int id) mutable {
if (id == (i32)sessions.size()) {
Sleep(WaitDuration);
-
+ Cerr << "start sessions close....." << Endl;
for (ui32 i = 0; i < sessions.size(); ++i) {
sessions[i].Close();
}
+ Cerr << "finished sessions close....." << Endl;
+
return;
}
@@ -120,6 +127,8 @@ Y_UNIT_TEST_SUITE(KqpService) {
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx()).GetValueSync();
if (!result.IsSuccess()) {
+ Sleep(TDuration::Seconds(5));
+ Cerr << "received non-success status for session " << id << Endl;
return;
}