aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-03-17 09:56:11 +0300
committergvit <gvit@ydb.tech>2023-03-17 09:56:11 +0300
commitb968d8bab7e871258ee24a0b36cbc52f54c57e93 (patch)
treee6afb4ef2bbdc2ab4c7401d6a02ee24fe5525118
parent702e198576ad3a00300f3f321dfef634f347a971 (diff)
downloadydb-b968d8bab7e871258ee24a0b36cbc52f54c57e93.tar.gz
decouple kqp session actor state into the separate file and introduce cookies to differentiate events; get rid of compile request actor
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_request.cpp339
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp55
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.h2
-rw-r--r--ydb/core/kqp/compile_service/ya.make1
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.cpp192
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h288
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp329
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.h2
-rw-r--r--ydb/core/kqp/session_actor/ya.make1
17 files changed, 582 insertions, 635 deletions
diff --git a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
index c8431e865bd..b3ac2e0be94 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
@@ -21,6 +21,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
index 985a5c70ff3..e786b9cc559 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
@@ -22,6 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
index 985a5c70ff3..e786b9cc559 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
@@ -22,6 +22,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
index c8431e865bd..b3ac2e0be94 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
@@ -21,6 +21,5 @@ target_link_libraries(core-kqp-compile_service PUBLIC
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_request.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_service.cpp
)
diff --git a/ydb/core/kqp/compile_service/kqp_compile_request.cpp b/ydb/core/kqp/compile_service/kqp_compile_request.cpp
deleted file mode 100644
index 3b5c7e55622..00000000000
--- a/ydb/core/kqp/compile_service/kqp_compile_request.cpp
+++ /dev/null
@@ -1,339 +0,0 @@
-#include "kqp_compile_service.h"
-
-#include <ydb/core/actorlib_impl/long_timer.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/actors/wilson/wilson_span.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/actors/core/log.h>
-
-#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
-#include <ydb/core/base/wilson.h>
-
-#include <util/string/escape.h>
-
-LWTRACE_USING(KQP_PROVIDER);
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NSchemeCache;
-using namespace NYql;
-
-class TKqpCompileRequestActor : public TActorBootstrapped<TKqpCompileRequestActor> {
-public:
- using TBase = TActorBootstrapped<TKqpCompileRequestActor>;
-
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::KQP_COMPILE_REQUEST;
- }
-
- TKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
- NWilson::TTraceId traceId)
- : Owner(owner)
- , UserToken(userToken)
- , Uid(uid)
- , Query(std::move(query))
- , KeepInCache(keepInCache)
- , Deadline(deadline)
- , DbCounters(dbCounters)
- , Orbit{std::move(orbit)}
- , CompileRequestSpan(TWilsonKqp::CompileRequest, std::move(traceId), "CompileRequest") {}
-
- void Bootstrap(const TActorContext& ctx) {
- LWTRACK(KqpCompileRequestBootstrap,
- Orbit,
- Query ? Query->UserSid : 0);
-
- TMaybe<TKqpQueryId> query;
- std::swap(Query, query);
-
- auto compileEv = MakeHolder<TEvKqp::TEvCompileRequest>(UserToken, Uid, std::move(query),
- KeepInCache, Deadline, DbCounters, std::move(Orbit));
- ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), compileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId());
-
- Become(&TKqpCompileRequestActor::MainState);
- }
-
- void Handle(TEvKqp::TEvCompileResponse::TPtr& ev, const TActorContext &ctx) {
- const auto& query = ev->Get()->CompileResult->Query;
- LWTRACK(KqpCompileRequestHandleServiceReply,
- ev->Get()->Orbit,
- query ? query->UserSid : 0);
-
- auto compileResult = ev->Get()->CompileResult;
- const auto& stats = ev->Get()->Stats;
-
- if (compileResult->Status != Ydb::StatusIds::SUCCESS || !stats.GetFromCache()) {
-
- if (CompileRequestSpan) {
- CompileRequestSpan.End();
- }
-
- ctx.Send(Owner, ev.Release());
- Die(ctx);
- return;
- }
-
- if (!NavigateTables(compileResult->PreparedQuery, compileResult->Query->Database, ctx)) {
-
- if (CompileRequestSpan) {
- CompileRequestSpan.End();
- }
-
- ctx.Send(Owner, ev.Release());
- Die(ctx);
- return;
- }
-
- DeferredResponse.Reset(ev.Release());
- }
-
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext &ctx) {
- if (ValidateTables(*ev->Get(), ctx)) {
-
- if (CompileRequestSpan) {
- CompileRequestSpan.EndOk();
- }
-
- ctx.Send(Owner, DeferredResponse.Release());
- Die(ctx);
- return;
- }
-
- auto& compileResult = *DeferredResponse->CompileResult;
-
- LOG_INFO_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Recompiling query due to scheme error"
- << ", self: " << ctx.SelfID
- << ", queryUid: " << compileResult.Uid);
-
- auto recompileEv = MakeHolder<TEvKqp::TEvRecompileRequest>(UserToken, compileResult.Uid, compileResult.Query,
- Deadline, DbCounters, std::move(DeferredResponse->Orbit));
- ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release(), 0, 0, CompileRequestSpan.GetTraceId());
-
- DeferredResponse.Reset();
- }
-
- void Handle(TEvKqp::TEvAbortExecution::TPtr& , const TActorContext &ctx) {
- this->Die(ctx);
- }
-
-private:
- STFUNC(MainState) {
- try {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqp::TEvCompileResponse, Handle);
- HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- HFunc(TEvKqp::TEvAbortExecution, Handle);
- default:
- UnexpectedEvent("MainState", ev->GetTypeRewrite(), ctx);
- }
- } catch (const yexception& e) {
- InternalError(e.what(), ctx);
- }
- }
-
-private:
- void FillTables(const NKqpProto::TKqpPhyTx& phyTx) {
- for (const auto& stage : phyTx.GetStages()) {
- auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) {
- TTableId tableId(table.GetOwnerId(), table.GetTableId());
- auto it = TableVersions.find(tableId);
- if (it != TableVersions.end()) {
- Y_ENSURE(it->second == table.GetVersion());
- } else {
- TableVersions.emplace(tableId, table.GetVersion());
- }
- };
- for (const auto& tableOp : stage.GetTableOps()) {
- addTable(tableOp.GetTable());
- }
- for (const auto& input : stage.GetInputs()) {
- if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
- addTable(input.GetStreamLookup().GetTable());
- }
- }
-
- for (const auto& source : stage.GetSources()) {
- if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
- addTable(source.GetReadRangesSource().GetTable());
- }
- }
- }
- }
-
- bool NavigateTables(const TPreparedQueryHolder::TConstPtr& query, const TString& database, const TActorContext& ctx) {
- TableVersions.clear();
-
- switch (query->GetVersion()) {
- case NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1:
- for (const auto& tx : query->GetPhysicalQuery().GetTransactions()) {
- FillTables(tx);
- }
- break;
-
- default:
- LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST,
- "Unexpected prepared query version"
- << ", self: " << ctx.SelfID
- << ", version: " << (ui32)query->GetVersion());
- return false;
- }
-
- if (TableVersions.empty()) {
- return false;
- }
-
- auto navigate = MakeHolder<TSchemeCacheNavigate>();
- navigate->DatabaseName = database;
- if (UserToken && !UserToken->GetSerializedToken().empty()) {
- navigate->UserToken = UserToken;
- }
-
- for (const auto& [tableId, _] : TableVersions) {
- TSchemeCacheNavigate::TEntry entry;
- entry.TableId = tableId;
- entry.RequestType = TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
- entry.Operation = TSchemeCacheNavigate::EOp::OpTable;
- entry.SyncVersion = false;
- entry.ShowPrivatePath = true;
-
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Query has dependency on table, check the table schema version"
- << ", self: " << ctx.SelfID
- << ", pathId: " << entry.TableId.PathId
- << ", version: " << entry.TableId.SchemaVersion);
-
- navigate->ResultSet.emplace_back(entry);
- }
-
- auto ev = MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(navigate.Release());
- ctx.Send(MakeSchemeCacheID(), ev.Release());
- return true;
- }
-
- bool ValidateTables(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response, const TActorContext& ctx) {
- Y_ENSURE(response.Request);
- const auto& navigate = *response.Request;
-
- for (const auto& entry : navigate.ResultSet) {
- switch (entry.Status) {
- case TSchemeCacheNavigate::EStatus::Ok: {
- auto expectedVersion = TableVersions.FindPtr(TTableId(entry.TableId.PathId));
- if (!expectedVersion) {
- LOG_WARN_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST,
- "Unexpected tableId in scheme cache navigate reply"
- << ", self: " << ctx.SelfID
- << ", tableId: " << entry.TableId);
- continue;
- }
-
- if (!*expectedVersion) {
- // Do not check tables with zero version.
- continue;
- }
-
- if (entry.TableId.SchemaVersion && entry.TableId.SchemaVersion != *expectedVersion) {
- LOG_INFO_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Scheme version mismatch"
- << ", self: " << ctx.SelfID
- << ", pathId: " << entry.TableId.PathId
- << ", expected version: " << *expectedVersion
- << ", actual version: " << entry.TableId.SchemaVersion);
- return false;
- }
-
- break;
- }
-
- case TSchemeCacheNavigate::EStatus::PathErrorUnknown:
- case TSchemeCacheNavigate::EStatus::PathNotTable:
- case TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
- LOG_INFO_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Scheme error"
- << ", self: " << ctx.SelfID
- << ", pathId: " << entry.TableId.PathId
- << ", status: " << entry.Status);
- return false;
-
- case TSchemeCacheNavigate::EStatus::LookupError:
- case TSchemeCacheNavigate::EStatus::RedirectLookupError:
- // Transient error, do not invalidate the query.
- // Hard validation will be performed later during the query execution.
- break;
-
- default:
- // Unexpected reply, do not invalidate the query as it may block the query execution.
- // Hard validation will be performed later during the query execution.
- LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Unexpected reply from scheme cache"
- << ", self: " << ctx.SelfID
- << ", pathId: " << entry.TableId.PathId
- << ", status: " << entry.Status);
- break;
- }
- }
-
- return true;
- }
-
-private:
- void UnexpectedEvent(const TString& state, ui32 eventType, const TActorContext &ctx) {
- InternalError(TStringBuilder() << "TKqpCompileRequestActor, unexpected event: " << eventType
- << ", at state:" << state, ctx);
- }
-
- void InternalError(const TString& message, const TActorContext &ctx) {
- LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_REQUEST, "Internal error"
- << ", self: " << ctx.SelfID
- << ", message: " << message);
-
-
- NYql::TIssue issue(NYql::TPosition(), "Internal error while proccessing query compilation request.");
- issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message));
-
- ReplyError(Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx);
- }
-
- void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx) {
- auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(TKqpCompileResult::Make({}, status, issues, ETableReadType::Other), std::move(Orbit));
-
- if (CompileRequestSpan) {
- CompileRequestSpan.EndError(issues.ToOneLineString());
- }
-
- ctx.Send(Owner, responseEv.Release());
- Die(ctx);
- }
-
-private:
- TActorId Owner;
- TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
- TMaybe<TString> Uid;
- TMaybe<TKqpQueryId> Query;
- bool KeepInCache = false;
- TInstant Deadline;
- TKqpDbCountersPtr DbCounters;
- THashMap<TTableId, ui64> TableVersions;
- THolder<TEvKqp::TEvCompileResponse> DeferredResponse;
- NLWTrace::TOrbit Orbit;
- NWilson::TSpan CompileRequestSpan;
-};
-
-
-IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
- TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit,
- NWilson::TTraceId traceId)
-{
- return new TKqpCompileRequestActor(
- owner,
- userToken,
- uid,
- std::move(query),
- keepInCache,
- deadline,
- dbCounters,
- std::move(orbit),
- std::move(traceId));
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 9ad13e9e662..711b9e927bc 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -178,7 +178,7 @@ private:
struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
- NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {})
+ ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {})
: Sender(sender)
, Query(std::move(query))
, Uid(uid)
@@ -187,7 +187,9 @@ struct TKqpCompileRequest {
, Deadline(deadline)
, DbCounters(dbCounters)
, Orbit(std::move(orbit))
- , CompileServiceSpan(std::move(span)) {}
+ , CompileServiceSpan(std::move(span))
+ , Cookie(cookie)
+ {}
TActorId Sender;
TKqpQueryId Query;
@@ -200,6 +202,7 @@ struct TKqpCompileRequest {
NLWTrace::TOrbit Orbit;
NWilson::TSpan CompileServiceSpan;
+ ui64 Cookie;
};
class TKqpRequestsQueue {
@@ -418,7 +421,7 @@ private:
}
catch (const std::exception& e) {
LogException("TEvCompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {});
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, ev->Cookie, std::move(ev->Get()->Orbit), {});
}
}
@@ -454,7 +457,7 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << *request.Uid);
- ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
+ ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
} else {
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
@@ -472,7 +475,7 @@ private:
<< ", queryUid: " << *request.Uid);
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid);
- ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
+ ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -495,7 +498,7 @@ private:
<< ", sender: " << ev->Sender
<< ", queryUid: " << compileResult->Uid);
- ReplyFromCache(ev->Sender, compileResult, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
+ ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -508,6 +511,7 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
+ ev->Cookie,
std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
@@ -519,7 +523,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
@@ -536,7 +540,7 @@ private:
}
catch (const std::exception& e) {
LogException("TEvRecompileRequest", ev->Sender, e, ctx);
- ReplyInternalError(ev->Sender, "", e.what(), ctx, std::move(ev->Get()->Orbit), {});
+ ReplyInternalError(ev->Sender, "", e.what(), ctx, ev->Cookie, std::move(ev->Get()->Orbit), {});
}
}
@@ -557,6 +561,7 @@ private:
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
true, request.UserToken, request.Deadline, dbCounters,
+ ev->Cookie,
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(CompileServiceSpan));
@@ -569,7 +574,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() <<
"Exceeded maximum number of requests in compile service queue.");
- ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
+ ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
}
} else {
@@ -581,7 +586,8 @@ private:
NWilson::TSpan CompileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
- ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
+ ReplyError(ev->Sender, request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx,
+ ev->Cookie, std::move(ev->Get()->Orbit), std::move(CompileServiceSpan));
return;
}
@@ -625,7 +631,8 @@ private:
auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query);
for (auto& request : requests) {
LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
- Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit), std::move(request.CompileServiceSpan));
+ Reply(request.Sender, compileResult, compileStats, ctx,
+ request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan));
}
} else {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
@@ -634,11 +641,13 @@ private:
}
LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
- Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
+ Reply(compileRequest.Sender, compileResult, compileStats, ctx,
+ compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
catch (const std::exception& e) {
LogException("TEvCompileResponse", ev->Sender, e, ctx);
- ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
+ ReplyInternalError(compileRequest.Sender, compileResult->Uid, e.what(), ctx,
+ compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
ProcessQueue(ctx);
@@ -696,7 +705,8 @@ private:
Counters->ReportCompileRequestTimeout(request->DbCounters);
NYql::TIssue issue(NYql::TPosition(), "Compilation timed out.");
- ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx, std::move(request->Orbit), std::move(request->CompileServiceSpan));
+ ReplyError(request->Sender, "", Ydb::StatusIds::TIMEOUT, {issue}, ctx,
+ request->Cookie, std::move(request->Orbit), std::move(request->CompileServiceSpan));
} else {
StartCompilation(std::move(*request), ctx);
}
@@ -724,7 +734,8 @@ private:
}
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
+ const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
+ NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
const auto& query = compileResult->Query;
LWTRACK(KqpCompileServiceReply,
@@ -744,34 +755,34 @@ private:
span.End();
}
- ctx.Send(sender, responseEv.Release());
+ ctx.Send(sender, responseEv.Release(), 0, cookie);
}
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
- const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
+ const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NKqpProto::TKqpStatsCompile stats;
stats.SetFromCache(true);
LWTRACK(KqpCompileServiceReplyFromCache, orbit);
- Reply(sender, compileResult, stats, ctx, std::move(orbit), std::move(span));
+ Reply(sender, compileResult, stats, ctx, cookie, std::move(orbit), std::move(span));
}
void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status,
- const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
+ const TIssues& issues, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyError, orbit);
- Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit), std::move(span));
+ Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, cookie, std::move(orbit), std::move(span));
}
void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,
- const TActorContext& ctx, NLWTrace::TOrbit orbit, NWilson::TSpan span)
+ const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation.");
issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message));
LWTRACK(KqpCompileServiceReplyInternalError, orbit);
- ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit), std::move(span));
+ ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, cookie, std::move(orbit), std::move(span));
}
static void LogException(const TString& scope, const TActorId& sender, const std::exception& e,
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h
index 5f7b408724f..aca598f2748 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.h
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.h
@@ -21,7 +21,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
- NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {});
+ ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TTraceId = {});
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/compile_service/ya.make b/ydb/core/kqp/compile_service/ya.make
index 2a0bfa651da..3fadfdad449 100644
--- a/ydb/core/kqp/compile_service/ya.make
+++ b/ydb/core/kqp/compile_service/ya.make
@@ -2,7 +2,6 @@ LIBRARY()
SRCS(
kqp_compile_actor.cpp
- kqp_compile_request.cpp
kqp_compile_service.cpp
)
diff --git a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt
index d241a939b3a..3b2e287ad0f 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt
@@ -23,4 +23,5 @@ target_sources(core-kqp-session_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp
)
diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt
index d54d507f651..88931dbd4f6 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt
@@ -24,4 +24,5 @@ target_sources(core-kqp-session_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp
)
diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt
index d54d507f651..88931dbd4f6 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt
@@ -24,4 +24,5 @@ target_sources(core-kqp-session_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp
)
diff --git a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt
index d241a939b3a..3b2e287ad0f 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt
@@ -23,4 +23,5 @@ target_sources(core-kqp-session_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_worker_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/session_actor/kqp_query_state.cpp
)
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
new file mode 100644
index 00000000000..ac6b84def53
--- /dev/null
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -0,0 +1,192 @@
+#include "kqp_query_state.h"
+
+namespace NKikimr::NKqp {
+
+using namespace NSchemeCache;
+
+#define LOG_C(msg) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_E(msg) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_W(msg) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_N(msg) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_I(msg) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+
+bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response) {
+ Y_ENSURE(response.Request);
+ const auto& navigate = *response.Request;
+
+ for (const auto& entry : navigate.ResultSet) {
+ switch (entry.Status) {
+ case TSchemeCacheNavigate::EStatus::Ok: {
+ auto expectedVersion = TableVersions.FindPtr(TTableId(entry.TableId.PathId));
+ if (!expectedVersion) {
+ LOG_W("Unexpected tableId in scheme cache navigate reply"
+ << ", tableId: " << entry.TableId);
+ continue;
+ }
+
+ if (!*expectedVersion) {
+ // Do not check tables with zero version.
+ continue;
+ }
+
+ if (entry.TableId.SchemaVersion && entry.TableId.SchemaVersion != *expectedVersion) {
+ LOG_I("Scheme version mismatch"
+ << ", pathId: " << entry.TableId.PathId
+ << ", expected version: " << *expectedVersion
+ << ", actual version: " << entry.TableId.SchemaVersion);
+ return false;
+ }
+
+ break;
+ }
+
+ case TSchemeCacheNavigate::EStatus::PathErrorUnknown:
+ case TSchemeCacheNavigate::EStatus::PathNotTable:
+ case TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
+ LOG_I("Scheme error"
+ << ", pathId: " << entry.TableId.PathId
+ << ", status: " << entry.Status);
+ return false;
+
+ case TSchemeCacheNavigate::EStatus::LookupError:
+ case TSchemeCacheNavigate::EStatus::RedirectLookupError:
+ // Transient error, do not invalidate the query.
+ // Hard validation will be performed later during the query execution.
+ break;
+
+ default:
+ // Unexpected reply, do not invalidate the query as it may block the query execution.
+ // Hard validation will be performed later during the query execution.
+ LOG_E("Unexpected reply from scheme cache"
+ << ", pathId: " << entry.TableId.PathId
+ << ", status: " << entry.Status);
+ break;
+ }
+ }
+
+ return true;
+}
+
+
+std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildNavigateKeySet() {
+ TableVersions.clear();
+
+ for (const auto& tx : PreparedQuery->GetPhysicalQuery().GetTransactions()) {
+ FillTables(tx);
+ }
+
+ auto navigate = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ navigate->DatabaseName = Database;
+ if (UserToken && !UserToken->GetSerializedToken().empty()) {
+ navigate->UserToken = UserToken;
+ }
+
+ for (const auto& [tableId, _] : TableVersions) {
+ NSchemeCache::TSchemeCacheNavigate::TEntry entry;
+ entry.TableId = tableId;
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpTable;
+ entry.SyncVersion = false;
+ entry.ShowPrivatePath = true;
+ navigate->ResultSet.emplace_back(entry);
+ }
+
+ return std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(navigate.Release());
+}
+
+
+bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
+ CompileResult = ev->CompileResult;
+ YQL_ENSURE(CompileResult);
+ MaxReadType = CompileResult->MaxReadType;
+ Orbit = std::move(ev->Orbit);
+
+ if (CompileResult->Status != Ydb::StatusIds::SUCCESS)
+ return false;
+
+ YQL_ENSURE(CompileResult->PreparedQuery);
+ const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion();
+ YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
+ "Unexpected prepared query version: " << compiledVersion);
+
+ CompileStats.Swap(&ev->Stats);
+ PreparedQuery = CompileResult->PreparedQuery;
+ return true;
+}
+
+std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() {
+ TMaybe<TKqpQueryId> query;
+ TMaybe<TString> uid;
+
+ bool keepInCache = false;
+ switch (GetAction()) {
+ case NKikimrKqp::QUERY_ACTION_EXECUTE:
+ query = TKqpQueryId(Cluster, Database, GetQuery(), GetType());
+ keepInCache = GetQueryKeepInCache() && query->IsSql();
+ break;
+
+ case NKikimrKqp::QUERY_ACTION_PREPARE:
+ query = TKqpQueryId(Cluster, Database, GetQuery(), GetType());
+ keepInCache = query->IsSql();
+ break;
+
+ case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
+ uid = GetPreparedQuery();
+ keepInCache = GetQueryKeepInCache();
+ break;
+
+ default:
+ YQL_ENSURE(false);
+ }
+
+ if (query) {
+ query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_;
+ }
+
+ auto compileDeadline = QueryDeadlines.TimeoutAt;
+ if (QueryDeadlines.CancelAt) {
+ compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
+ }
+
+ return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, uid,
+ std::move(query), keepInCache, compileDeadline, DbCounters, std::move(Orbit));
+}
+
+std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest() {
+ YQL_ENSURE(CompileResult);
+ TMaybe<TKqpQueryId> query;
+ TMaybe<TString> uid;
+
+ switch (GetAction()) {
+ case NKikimrKqp::QUERY_ACTION_EXECUTE:
+ query = TKqpQueryId(Cluster, Database, GetQuery(), GetType());
+ break;
+
+ case NKikimrKqp::QUERY_ACTION_PREPARE:
+ query = TKqpQueryId(Cluster, Database, GetQuery(), GetType());
+ break;
+
+ case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
+ uid = GetPreparedQuery();
+ break;
+
+ default:
+ YQL_ENSURE(false);
+ }
+
+ if (query) {
+ query->Settings.DocumentApiRestricted = IsDocumentApiRestricted_;
+ }
+
+ auto compileDeadline = QueryDeadlines.TimeoutAt;
+ if (QueryDeadlines.CancelAt) {
+ compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
+ }
+
+ return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid,
+ CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit));
+}
+
+}
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
new file mode 100644
index 00000000000..6d2fb731fe4
--- /dev/null
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -0,0 +1,288 @@
+#pragma once
+
+#include "kqp_worker_common.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
+#include <library/cpp/actors/wilson/wilson_trace.h>
+
+#include <ydb/core/base/cputime.h>
+#include <ydb/core/base/wilson.h>
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/common/kqp_resolve.h>
+#include <ydb/core/kqp/common/kqp_timeouts.h>
+#include <ydb/core/kqp/session_actor/kqp_tx.h>
+
+#include <util/generic/noncopyable.h>
+
+namespace NKikimr::NKqp {
+
+// basically it's a state that holds all the context
+// about the specific query execution.
+// it holds the unique pointer to the query request, which may include
+// the context of user RPC (if session is on the same node with user RPC, which is the most
+// common case).
+class TKqpQueryState : public TNonCopyable {
+public:
+ TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database,
+ const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession,
+ const NKikimrConfig::TTableServiceConfig& config, NWilson::TTraceId&& traceId)
+ : QueryId(queryId)
+ , Database(database)
+ , Cluster(cluster)
+ , DbCounters(dbCounters)
+ , Sender(ev->Sender)
+ , ProxyRequestId(ev->Cookie)
+ , ParametersSize(ev->Get()->GetParametersSize())
+ , RequestActorId(ev->Get()->GetRequestActorId())
+ , TraceId(ev->Get()->GetTraceId())
+ , IsDocumentApiRestricted_(IsDocumentApiRestricted(ev->Get()->GetRequestType()))
+ , StartTime(TInstant::Now())
+ , KeepSession(ev->Get()->GetKeepSession() || longSession)
+ , UserToken(ev->Get()->GetUserToken())
+ {
+ RequestEv.reset(ev->Release().Release());
+ SetQueryDeadlines(config);
+ auto action = GetAction();
+ KqpSessionSpan = NWilson::TSpan(
+ TWilsonKqp::KqpSession, std::move(traceId),
+ "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
+ }
+
+ // the monotonously growing counter, the ordinal number of the query,
+ // executed by the session.
+ // this counter may be used as a cookie by a session actor to reject events
+ // with cookie less than current QueryId.
+ ui64 QueryId = 0;
+ TString Database;
+ TString Cluster;
+ TKqpDbCountersPtr DbCounters;
+ TActorId Sender;
+ ui64 ProxyRequestId = 0;
+ std::unique_ptr<TEvKqp::TEvQueryRequest> RequestEv;
+ ui64 ParametersSize = 0;
+ TPreparedQueryHolder::TConstPtr PreparedQuery;
+ TKqpCompileResult::TConstPtr CompileResult;
+ NKqpProto::TKqpStatsCompile CompileStats;
+ TIntrusivePtr<TKqpTransactionContext> TxCtx;
+ TQueryData::TPtr QueryData;
+
+ TActorId RequestActorId;
+
+ ui64 CurrentTx = 0;
+ TString TraceId;
+ bool IsDocumentApiRestricted_ = false;
+
+ TInstant StartTime;
+ NYql::TKikimrQueryDeadlines QueryDeadlines;
+
+ NKqpProto::TKqpStatsQuery Stats;
+ bool KeepSession = false;
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+
+ THashMap<NKikimr::TTableId, ui64> TableVersions;
+
+ NLWTrace::TOrbit Orbit;
+ NWilson::TSpan KqpSessionSpan;
+ ETableReadType MaxReadType = ETableReadType::Other;
+
+ TTxId TxId; // User tx
+ bool Commit = false;
+ bool Commited = false;
+
+ NTopic::TTopicOperations TopicOperations;
+ TDuration CpuTime;
+ std::optional<NCpuTime::TCpuTimer> CurrentTimer;
+
+ NKikimrKqp::EQueryAction GetAction() const {
+ return RequestEv->GetAction();
+ }
+
+ bool GetKeepSession() const {
+ return RequestEv->GetKeepSession();
+ }
+
+ const TString& GetQuery() const {
+ return RequestEv->GetQuery();
+ }
+
+ const TString& GetPreparedQuery() const {
+ return RequestEv->GetPreparedQuery();
+ }
+
+ NKikimrKqp::EQueryType GetType() const {
+ return RequestEv->GetType();
+ }
+
+ void EnsureAction() {
+ YQL_ENSURE(RequestEv->HasAction());
+ }
+
+ bool GetUsePublicResponseDataFormat() const {
+ return RequestEv->GetUsePublicResponseDataFormat();
+ }
+
+ void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) {
+ auto now = TAppData::TimeProvider->Now();
+ auto cancelAfter = RequestEv->GetCancelAfter();
+ auto timeout = RequestEv->GetOperationTimeout();
+ if (cancelAfter.MilliSeconds() > 0) {
+ QueryDeadlines.CancelAt = now + cancelAfter;
+ }
+
+ auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service);
+ QueryDeadlines.TimeoutAt = now + timeoutMs;
+ }
+
+ bool HasTopicOperations() const {
+ return RequestEv->HasTopicOperations();
+ }
+
+ bool GetQueryKeepInCache() const {
+ return RequestEv->GetQueryKeepInCache();
+ }
+
+ const TString& GetDatabase() const {
+ return RequestEv->GetDatabase();
+ }
+
+ // todo: gvit
+ // fill this hash set only once on query compilation.
+ void FillTables(const NKqpProto::TKqpPhyTx& phyTx) {
+ for (const auto& stage : phyTx.GetStages()) {
+ auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) {
+ NKikimr::TTableId tableId(table.GetOwnerId(), table.GetTableId());
+ auto it = TableVersions.find(tableId);
+ if (it != TableVersions.end()) {
+ Y_ENSURE(it->second == table.GetVersion());
+ } else {
+ TableVersions.emplace(tableId, table.GetVersion());
+ }
+ };
+ for (const auto& tableOp : stage.GetTableOps()) {
+ addTable(tableOp.GetTable());
+ }
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ addTable(input.GetStreamLookup().GetTable());
+ }
+ }
+
+ for (const auto& source : stage.GetSources()) {
+ if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
+ addTable(source.GetReadRangesSource().GetTable());
+ }
+ }
+ }
+ }
+
+ bool NeedCheckTableVersions() const {
+ return CompileStats.GetFromCache();
+ }
+
+ TString ExtractQueryText() const {
+ if (CompileResult) {
+ if (CompileResult->Query) {
+ return CompileResult->Query->Text;
+ }
+ return {};
+ }
+ return RequestEv->GetQuery();
+ }
+
+ const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
+ return RequestEv->GetTopicOperations();
+ }
+
+ bool NeedPersistentSnapshot() const {
+ auto type = GetType();
+ return (
+ type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
+ type == NKikimrKqp::QUERY_TYPE_AST_SCAN
+ );
+ }
+
+ bool NeedSnapshot(const NYql::TKikimrConfiguration& config) const {
+ return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery());
+ }
+
+ bool HasTxControl() const {
+ return RequestEv->HasTxControl();
+ }
+
+ const ::Ydb::Table::TransactionControl& GetTxControl() const {
+ return RequestEv->GetTxControl();
+ }
+
+ const ::NKikimrMiniKQL::TParams& GetParameters() const {
+ return RequestEv->GetParameters();
+ }
+
+ // validate the compiled query response and ensure that all table versions are not
+ // changed since the last compilation.
+ bool EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response);
+ // builds a request to navigate schema of all tables, that participate in query
+ // execution.
+ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> BuildNavigateKeySet();
+ // same the context of the compiled query to the query state.
+ bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev);
+ // build the compilation request.
+ std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest();
+ // TODO(gvit): get rid of code duplication in these requests,
+ // use only one of these requests.
+ std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest();
+
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
+ return RequestEv->GetYdbParameters();
+ }
+
+ Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const {
+ if (!RequestEv->HasCollectStats()) {
+ return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
+ }
+
+ auto cStats = RequestEv->GetCollectStats();
+ if (cStats == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) {
+ return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
+ }
+
+ return cStats;
+ }
+
+ bool ReportStats() const {
+ return GetStatsMode() != Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE
+ // always report stats for scripting subrequests
+ || GetType() == NKikimrKqp::QUERY_TYPE_AST_DML
+ || GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN
+ ;
+ }
+
+ bool HasPreparedQuery() const {
+ return RequestEv->HasPreparedQuery();
+ }
+
+ bool IsStreamResult() const {
+ auto type = GetType();
+ return (
+ type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_QUERY ||
+ type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY
+ );
+ }
+
+ void ResetTimer() {
+ if (CurrentTimer) {
+ CpuTime += CurrentTimer->GetTime();
+ CurrentTimer.reset();
+ }
+ }
+
+ TDuration GetCpuTime() {
+ ResetTimer();
+ return CpuTime;
+ }
+};
+
+
+}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index ac2ce73af44..a52d197028f 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1,6 +1,7 @@
#include "kqp_session_actor.h"
#include "kqp_tx.h"
#include "kqp_worker_common.h"
+#include "kqp_query_state.h"
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/common/kqp_ru_calc.h>
@@ -44,6 +45,7 @@ namespace NKikimr {
namespace NKqp {
using namespace NYql;
+using namespace NSchemeCache;
namespace {
@@ -67,180 +69,6 @@ public:
{}
};
-struct TKqpQueryState {
- TActorId Sender;
- ui64 ProxyRequestId = 0;
- std::unique_ptr<TEvKqp::TEvQueryRequest> RequestEv;
- ui64 ParametersSize = 0;
- TPreparedQueryHolder::TConstPtr PreparedQuery;
- TKqpCompileResult::TConstPtr CompileResult;
- NKqpProto::TKqpStatsCompile CompileStats;
- TIntrusivePtr<TKqpTransactionContext> TxCtx;
- TQueryData::TPtr QueryData;
-
- TActorId RequestActorId;
-
- ui64 CurrentTx = 0;
- TString TraceId;
- bool IsDocumentApiRestricted = false;
-
- TInstant StartTime;
- NYql::TKikimrQueryDeadlines QueryDeadlines;
-
- NKqpProto::TKqpStatsQuery Stats;
- bool KeepSession = false;
- TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
-
- NLWTrace::TOrbit Orbit;
- NWilson::TSpan KqpSessionSpan;
- ETableReadType MaxReadType = ETableReadType::Other;
-
- TTxId TxId; // User tx
- bool Commit = false;
- bool Commited = false;
-
- NTopic::TTopicOperations TopicOperations;
- TDuration CpuTime;
- std::optional<NCpuTime::TCpuTimer> CurrentTimer;
-
- NKikimrKqp::EQueryAction GetAction() const {
- return RequestEv->GetAction();
- }
-
- bool GetKeepSession() const {
- return RequestEv->GetKeepSession();
- }
-
- const TString& GetQuery() const {
- return RequestEv->GetQuery();
- }
-
- const TString& GetPreparedQuery() const {
- return RequestEv->GetPreparedQuery();
- }
-
- NKikimrKqp::EQueryType GetType() const {
- return RequestEv->GetType();
- }
-
- void EnsureAction() {
- YQL_ENSURE(RequestEv->HasAction());
- }
-
- bool GetUsePublicResponseDataFormat() const {
- return RequestEv->GetUsePublicResponseDataFormat();
- }
-
- void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) {
- auto now = TAppData::TimeProvider->Now();
- auto cancelAfter = RequestEv->GetCancelAfter();
- auto timeout = RequestEv->GetOperationTimeout();
- if (cancelAfter.MilliSeconds() > 0) {
- QueryDeadlines.CancelAt = now + cancelAfter;
- }
-
- auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service);
- QueryDeadlines.TimeoutAt = now + timeoutMs;
- }
-
- bool HasTopicOperations() const {
- return RequestEv->HasTopicOperations();
- }
-
- bool GetQueryKeepInCache() const {
- return RequestEv->GetQueryKeepInCache();
- }
-
- const TString& GetDatabase() const {
- return RequestEv->GetDatabase();
- }
-
- TString ExtractQueryText() const {
- if (CompileResult) {
- if (CompileResult->Query) {
- return CompileResult->Query->Text;
- }
- return {};
- }
- return RequestEv->GetQuery();
- }
-
- const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
- return RequestEv->GetTopicOperations();
- }
-
- bool NeedPersistentSnapshot() const {
- auto type = GetType();
- return (
- type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
- type == NKikimrKqp::QUERY_TYPE_AST_SCAN
- );
- }
-
- bool HasTxControl() const {
- return RequestEv->HasTxControl();
- }
-
- const ::Ydb::Table::TransactionControl& GetTxControl() const {
- return RequestEv->GetTxControl();
- }
-
- const ::NKikimrMiniKQL::TParams& GetParameters() const {
- return RequestEv->GetParameters();
- }
-
- const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
- return RequestEv->GetYdbParameters();
- }
-
- Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const {
- if (!RequestEv->HasCollectStats()) {
- return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
- }
-
- auto cStats = RequestEv->GetCollectStats();
- if (cStats == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) {
- return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
- }
-
- return cStats;
- }
-
- bool ReportStats() const {
- return GetStatsMode() != Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE
- // always report stats for scripting subrequests
- || GetType() == NKikimrKqp::QUERY_TYPE_AST_DML
- || GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN
- ;
- }
-
- bool HasPreparedQuery() const {
- return RequestEv->HasPreparedQuery();
- }
-
- bool IsStreamResult() const {
- auto type = GetType();
- return (
- type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
- type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
- type == NKikimrKqp::QUERY_TYPE_SQL_QUERY ||
- type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY
- );
- }
-
- void ResetTimer() {
- if (CurrentTimer) {
- CpuTime += CurrentTimer->GetTime();
- CurrentTimer.reset();
- }
- }
-
- TDuration GetCpuTime() {
- ResetTimer();
- return CpuTime;
- }
-};
-
struct TKqpCleanupCtx {
std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
bool IsWaitingForWorkerToClose = false;
@@ -325,29 +153,17 @@ public:
void MakeNewQueryState(TEvKqp::TEvQueryRequest::TPtr& ev) {
++QueryId;
YQL_ENSURE(!QueryState);
- QueryState = std::make_shared<TKqpQueryState>();
- QueryState->Sender = ev->Sender;
- QueryState->ProxyRequestId = ev->Cookie;
- QueryState->TraceId = ev->Get()->GetTraceId();
- QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(ev->Get()->GetRequestType());
- QueryState->StartTime = TInstant::Now();
- QueryState->UserToken = ev->Get()->GetUserToken();
- QueryState->ParametersSize = ev->Get()->GetParametersSize();
- QueryState->RequestActorId = ev->Get()->GetRequestActorId();
- auto selfId = SelfId();
- auto as = TActivationContext::ActorSystem();
- ev->Get()->SetClientLostAction(selfId, as);
- QueryState->RequestEv.reset(ev->Release().Release());
- QueryState->KeepSession = Settings.LongSession || QueryState->GetKeepSession();
- QueryState->SetQueryDeadlines(Settings.Service);
- auto action = QueryState->GetAction();
-
NWilson::TTraceId id;
if (false) { // change to enable Wilson tracing
id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>());
LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId()));
}
- QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
+ auto selfId = SelfId();
+ auto as = TActivationContext::ActorSystem();
+ ev->Get()->SetClientLostAction(selfId, as);
+ QueryState = std::make_shared<TKqpQueryState>(
+ ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
+ Settings.Service, std::move(id));
}
bool ConvertParameters() {
@@ -614,46 +430,8 @@ public:
void CompileQuery() {
YQL_ENSURE(QueryState);
- TMaybe<TKqpQueryId> query;
- TMaybe<TString> uid;
-
- bool keepInCache = false;
- switch (QueryState->GetAction()) {
- case NKikimrKqp::QUERY_ACTION_EXECUTE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, QueryState->GetQuery(), QueryState->GetType());
- keepInCache = QueryState->GetQueryKeepInCache() && query->IsSql();
- break;
-
- case NKikimrKqp::QUERY_ACTION_PREPARE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, QueryState->GetQuery(), QueryState->GetType());
- keepInCache = query->IsSql();
- break;
-
- case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
- uid = QueryState->GetPreparedQuery();
- keepInCache = QueryState->GetQueryKeepInCache();
- break;
-
- default:
- YQL_ENSURE(false);
- }
-
- if (query) {
- query->Settings.DocumentApiRestricted = QueryState->IsDocumentApiRestricted;
- }
-
- auto compileDeadline = QueryState->QueryDeadlines.TimeoutAt;
- if (QueryState->QueryDeadlines.CancelAt) {
- compileDeadline = Min(compileDeadline, QueryState->QueryDeadlines.CancelAt);
- }
-
- auto compileRequestActor = CreateKqpCompileRequestActor(SelfId(), QueryState->UserToken, uid,
- std::move(query), keepInCache, compileDeadline, Settings.DbCounters,
- QueryState ? std::move(QueryState->Orbit) : NLWTrace::TOrbit(),
- QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId());
-
- CompileActorId = RegisterWithSameMailbox(compileRequestActor);
-
+ auto ev = QueryState->BuildCompileRequest();
+ Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
Become(&TKqpSessionActor::CompileState);
}
@@ -661,38 +439,53 @@ public:
ReplyBusy(ev);
}
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ const auto* response = ev->Get();
+
+ // table versions are not the same. need the query recompilation.
+ if (!QueryState->EnsureTableVersions(*response)) {
+ auto ev = QueryState->BuildReCompileRequest();
+ Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
+ return;
+ }
+
+ OnSuccessCompileRequest();
+ }
+
void Handle(TEvKqp::TEvCompileResponse::TPtr& ev) {
- if (ev->Sender != CompileActorId) {
+ // outdated event from previous query.
+ // ignoring that.
+ if (ev->Cookie < QueryId) {
return;
}
- CompileActorId = TActorId{};
- TTimerGuard timer(this);
- auto compileResult = ev->Get()->CompileResult;
- YQL_ENSURE(compileResult);
YQL_ENSURE(QueryState);
- QueryState->MaxReadType = compileResult->MaxReadType;
- QueryState->Orbit = std::move(ev->Get()->Orbit);
-
- LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status);
+ TTimerGuard timer(this);
- if (compileResult->Status != Ydb::StatusIds::SUCCESS) {
- ReplyQueryCompileError(compileResult);
+ // saving compile response and checking that compilation status
+ // is success.
+ if (!QueryState->SaveAndCheckCompileResult(ev->Get())) {
+ LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status);
+ ReplyQueryCompileError();
return;
}
- YQL_ENSURE(compileResult->PreparedQuery);
- const ui32 compiledVersion = compileResult->PreparedQuery->GetVersion();
- YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
- "Unexpected prepared query version: " << compiledVersion);
+ LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status);
+ // even if we have successfully compilation result, it doesn't mean anything
+ // in terms of current schema version of the table if response of compilation is from the cache.
+ // because of that, we are forcing to run schema version check
+ if (QueryState->NeedCheckTableVersions()) {
+ auto ev = QueryState->BuildNavigateKeySet();
+ Send(MakeSchemeCacheID(), ev.release());
+ return;
+ }
- QueryState->CompileResult = compileResult;
- QueryState->CompileStats.Swap(&ev->Get()->Stats);
- QueryState->PreparedQuery = compileResult->PreparedQuery;
+ OnSuccessCompileRequest();
+ }
+ void OnSuccessCompileRequest() {
if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) {
- ReplyPrepareResult(compileResult);
- return;
+ return ReplyPrepareResult();
}
if (!PrepareQueryContext()) {
@@ -704,11 +497,11 @@ public:
QueryState->TxCtx->OnBeginQuery();
if (QueryState->NeedPersistentSnapshot()) {
- return AcquirePersistentSnapshot();
- } else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit,
- QueryState->PreparedQuery->GetPhysicalQuery()))
- {
- return AcquireMvccSnapshot();
+ AcquirePersistentSnapshot();
+ return;
+ } else if (QueryState->NeedSnapshot(*Config)) {
+ AcquireMvccSnapshot();
+ return;
}
// Can reply inside (in case of deferred-only transactions) and become ReadyState
@@ -917,9 +710,10 @@ public:
}
}
- void ReplyPrepareResult(const TKqpCompileResult::TConstPtr& compileResult) {
+ void ReplyPrepareResult() {
+ YQL_ENSURE(QueryState);
QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>();
- FillCompileStatus(compileResult, QueryResponse->Record);
+ FillCompileStatus(QueryState->CompileResult, QueryResponse->Record);
auto ru = NRuCalc::CpuTimeToUnit(TDuration::MicroSeconds(QueryState->CompileStats.GetCpuTimeUs()));
auto& record = QueryResponse->Record.GetRef();
@@ -1405,11 +1199,6 @@ public:
void Handle(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
- if (CompileActorId) {
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded");
- Send(std::exchange(CompileActorId, {}), abortEv.Release());
- }
-
if (ExecuterId) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded");
Send(std::exchange(ExecuterId, {}), abortEv.Release());
@@ -1639,9 +1428,10 @@ public:
Cleanup();
}
- void ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) {
+ void ReplyQueryCompileError() {
+ YQL_ENSURE(QueryState);
QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>();
- FillCompileStatus(compileResult, QueryResponse->Record);
+ FillCompileStatus(QueryState->CompileResult, QueryResponse->Record);
auto txId = TTxId();
if (QueryState->HasTxControl()) {
@@ -1651,7 +1441,7 @@ public:
}
}
- LOG_W("ReplyQueryCompileError, status " << compileResult->Status << " remove tx with tx_id: " << txId.GetHumanStr());
+ LOG_W("ReplyQueryCompileError, status " << QueryState->CompileResult->Status << " remove tx with tx_id: " << txId.GetHumanStr());
if (auto ctx = Transactions.ReleaseTransaction(txId)) {
ctx->Invalidate();
Transactions.AddToBeAborted(std::move(ctx));
@@ -2008,6 +1798,7 @@ public:
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
default:
UnexpectedEvent("ReadyState", ev);
}
@@ -2032,6 +1823,7 @@ public:
// forgotten messages from previous aborted request
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
default:
UnexpectedEvent("CompileState", ev);
}
@@ -2061,6 +1853,7 @@ public:
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
@@ -2089,6 +1882,7 @@ public:
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
@@ -2261,7 +2055,6 @@ private:
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TKqpSettings::TConstPtr KqpSettings;
std::optional<TActorId> WorkerId;
- TActorId CompileActorId;
TActorId ExecuterId;
std::shared_ptr<TKqpQueryState> QueryState;
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h
index cb2ee2a7cf8..481fa2ce896 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.h
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include "kqp_session_actor.h"
#include <ydb/core/kqp/common/kqp.h>
diff --git a/ydb/core/kqp/session_actor/ya.make b/ydb/core/kqp/session_actor/ya.make
index 600976976f1..3fa3c490f82 100644
--- a/ydb/core/kqp/session_actor/ya.make
+++ b/ydb/core/kqp/session_actor/ya.make
@@ -6,6 +6,7 @@ SRCS(
kqp_tx.cpp
kqp_worker_actor.cpp
kqp_worker_common.cpp
+ kqp_query_state.cpp
)
PEERDIR(