aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-02-14 15:16:12 +0300
committergvit <gvit@ydb.tech>2023-02-14 15:16:12 +0300
commit11b72c5b82735d8c975d1a077877123ca4989589 (patch)
treeeac942a5dff39700067c71fdf653ac8c21cff9eb
parent4f395457741333ecbc606ef3ef91606181d7c2f2 (diff)
downloadydb-11b72c5b82735d8c975d1a077877123ca4989589.tar.gz
refactor session actor
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp104
-rw-r--r--ydb/core/kqp/common/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/common/kqp.h211
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp64
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp138
7 files changed, 366 insertions, 154 deletions
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index a71133cb646..a7eab0d1763 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -25,77 +25,6 @@ using namespace NKqp;
using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
Ydb::Table::ExecuteDataQueryResponse>;
-void SerializeQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& in, NKikimrKqp::TEvQueryRequest* dst) noexcept {
- auto req = TEvExecuteDataQueryRequest::GetProtoRequest(in);
-
- SetAuthToken(*dst, *in.get());
- SetDatabase(*dst, *in.get());
-
- dst->MutableRequest()->SetSessionId(req->session_id());
- dst->MutableRequest()->SetUsePublicResponseDataFormat(true);
-
- if (auto traceId = in->GetTraceId()) {
- dst->SetTraceId(traceId.GetRef());
- }
-
- if (auto requestType = in->GetRequestType()) {
- dst->SetRequestType(requestType.GetRef());
- }
-
- const auto& operationParams = req->operation_params();
- const auto& operationTimeout = GetDuration(operationParams.operation_timeout());
- const auto& cancelAfter = GetDuration(operationParams.cancel_after());
-
- dst->MutableRequest()->SetCancelAfterMs(cancelAfter.MilliSeconds());
- dst->MutableRequest()->SetTimeoutMs(operationTimeout.MilliSeconds());
-
- dst->MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control());
- dst->MutableRequest()->MutableQueryCachePolicy()->CopyFrom(req->query_cache_policy());
- dst->MutableRequest()->SetStatsMode(GetKqpStatsMode(req->collect_stats()));
- dst->MutableRequest()->SetCollectStats(req->collect_stats());
-
- const auto& query = req->query();
-
- switch (query.query_case()) {
- case Query::kYqlText: {
- dst->MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- dst->MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- dst->MutableRequest()->SetQuery(query.yql_text());
- break;
- }
-
- case Query::kId: {
- dst->MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED);
- dst->MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_PREPARED_DML);
-
- TString preparedQueryId;
- try {
- preparedQueryId = DecodePreparedQueryId(query.id());
- } catch (const std::exception& ex) {
- NYql::TIssues issues;
- issues.AddIssue(NYql::ExceptionToIssue(ex));
-
- dst->SetYdbStatus(Ydb::StatusIds::BAD_REQUEST);
- NYql::IssuesToMessage(issues, dst->MutableQueryIssues());
- return;
- }
-
- dst->MutableRequest()->SetPreparedQuery(preparedQueryId);
- break;
- }
-
- default: {
- NYql::TIssues issues;
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
- dst->SetYdbStatus(Ydb::StatusIds::BAD_REQUEST);
- NYql::IssuesToMessage(issues, dst->MutableQueryIssues());
- return;
- }
- }
-
- dst->MutableRequest()->MutableYdbParameters()->insert(req->parameters().begin(), req->parameters().end());
-}
-
class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest> {
using TBase = TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest>;
@@ -153,6 +82,10 @@ public:
}
auto& query = req->query();
+ TString yqlText;
+ TString queryId;
+ NKikimrKqp::EQueryAction queryAction;
+ NKikimrKqp::EQueryType queryType;
switch (query.query_case()) {
case Query::kYqlText: {
@@ -160,6 +93,9 @@ public:
if (!CheckQuery(query.yql_text(), issues)) {
return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
}
+ queryAction = NKikimrKqp::QUERY_ACTION_EXECUTE;
+ queryType = NKikimrKqp::QUERY_TYPE_SQL_DML;
+ yqlText = query.yql_text();
break;
}
@@ -169,6 +105,18 @@ public:
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty query id"));
return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
}
+
+ try {
+ queryId = DecodePreparedQueryId(query.id());
+ } catch (const std::exception& ex) {
+ NYql::TIssues issues;
+ issues.AddIssue(NYql::ExceptionToIssue(ex));
+ return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
+ return;
+ }
+
+ queryAction = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED;
+ queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML;
break;
}
@@ -179,7 +127,19 @@ public:
}
}
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(Request_, SerializeQueryRequest, SelfId());
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
+ Request_,
+ req->session_id(),
+ SelfId(),
+ std::move(yqlText),
+ std::move(queryId),
+ queryAction,
+ queryType,
+ &req->tx_control(),
+ &req->parameters(),
+ req->collect_stats(),
+ &req->query_cache_policy(),
+ &req->operation_params());
ev->PrepareRemote();
ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt
index 7ffd2b4be82..b7c74e48b60 100644
--- a/ydb/core/kqp/common/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/common/CMakeLists.darwin.txt
@@ -29,6 +29,7 @@ target_link_libraries(core-kqp-common PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_event_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
index b669a94abbc..aa4bfc498cc 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
@@ -30,6 +30,7 @@ target_link_libraries(core-kqp-common PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_event_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt
index b669a94abbc..aa4bfc498cc 100644
--- a/ydb/core/kqp/common/CMakeLists.linux.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux.txt
@@ -30,6 +30,7 @@ target_link_libraries(core-kqp-common PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_event_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index e4161d225a6..477ae39811b 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -254,10 +254,30 @@ struct TEvKqp {
struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
public:
- using TSerializerCb = void (*)(std::shared_ptr<NGRpcService::IRequestCtxMtSafe>&, NKikimrKqp::TEvQueryRequest*) noexcept;
- TEvQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, TSerializerCb cb, TActorId actorId)
+ TEvQueryRequest(
+ std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx,
+ const TString& sessionId,
+ TActorId actorId,
+ TString&& yqlText,
+ TString&& queryId,
+ NKikimrKqp::EQueryAction queryAction,
+ NKikimrKqp::EQueryType queryType,
+ const ::Ydb::Table::TransactionControl* txControl,
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters,
+ const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
+ const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
+ const ::Ydb::Operations::OperationParams* operationParams)
: RequestCtx(ctx)
- , SerializerCb(cb)
+ , SessionId(sessionId)
+ , YqlText(std::move(yqlText))
+ , QueryId(std::move(queryId))
+ , QueryAction(queryAction)
+ , QueryType(queryType)
+ , TxControl(txControl)
+ , YdbParameters(ydbParameters)
+ , CollectStats(collectStats)
+ , QueryCachePolicy(queryCachePolicy)
+ , OperationParams(operationParams)
{
ActorIdToProto(actorId, Record.MutableCancelationActor());
}
@@ -272,6 +292,168 @@ struct TEvKqp {
// Same as TEventPBBase but without Rope (but can contain Payload and will lose some data after all)
TEventSerializationInfo CreateSerializationInfo() const override { return {}; }
+ bool HasYdbStatus() const {
+ if (RequestCtx) {
+ return false;
+ }
+
+ return Record.HasYdbStatus();
+ }
+
+ bool HasTopicOperations() const {
+ return Record.GetRequest().HasTopicOperations();
+ }
+
+ bool HasAction() const {
+ if (RequestCtx) {
+ // passed directly to constructor.
+ return true;
+ } else {
+ return Record.GetRequest().HasAction();
+ }
+ }
+
+ const TString& GetSessionId() const {
+ if (RequestCtx) {
+ return SessionId;
+ }
+
+ return Record.GetRequest().GetSessionId();
+ }
+
+ NKikimrKqp::EQueryAction GetAction() const {
+ if (RequestCtx) {
+ return QueryAction;
+ }
+
+ return Record.GetRequest().GetAction();
+
+ }
+
+ NKikimrKqp::EQueryType GetType() const {
+ if (RequestCtx) {
+ return QueryType;
+ }
+
+ return Record.GetRequest().GetType();
+ }
+
+ bool HasPreparedQuery() const {
+ if (!QueryId.empty()) {
+ return true;
+ }
+
+ return Record.GetRequest().HasPreparedQuery();
+ }
+
+ const TString& GetPreparedQuery() const {
+ if (!QueryId.empty()) {
+ return QueryId;
+ }
+
+ return Record.GetRequest().GetPreparedQuery();
+ }
+
+ const TString& GetQuery() const {
+ if (!YqlText.empty()) {
+ return YqlText;
+ }
+
+ return Record.GetRequest().GetQuery();
+ }
+
+ const ::NKikimrMiniKQL::TParams& GetParameters() const {
+ return Record.GetRequest().GetParameters();
+ }
+
+ const ::Ydb::Table::TransactionControl& GetTxControl() const {
+ if (TxControl) {
+ return *TxControl;
+ }
+
+ return Record.GetRequest().GetTxControl();
+ }
+
+ bool GetUsePublicResponseDataFormat() const {
+ if (RequestCtx) {
+ return true;
+ }
+
+ return Record.GetRequest().GetUsePublicResponseDataFormat();
+ }
+
+ const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const {
+ if (QueryCachePolicy) {
+ return *QueryCachePolicy;
+ }
+
+ return Record.GetRequest().GetQueryCachePolicy();
+ }
+
+ bool HasTxControl() const {
+ if (TxControl) {
+ return true;
+ }
+
+ return Record.GetRequest().HasTxControl();
+ }
+
+ TActorId GetRequestActorId() const {
+ return ActorIdFromProto(Record.GetRequestActorId());
+ }
+
+ const TString& GetTraceId() const {
+ if (RequestCtx) {
+ if (auto traceId = RequestCtx->GetTraceId()) {
+ return traceId.GetRef();
+ }
+ }
+
+ return Record.GetTraceId();
+ }
+
+ const TString& GetRequestType() const {
+ if (RequestCtx) {
+ if (auto requestType = RequestCtx->GetRequestType()) {
+ return requestType.GetRef();
+ }
+ }
+
+ return Record.GetRequestType();
+ }
+
+ const TString& GetUserToken() const {
+ if (RequestCtx && RequestCtx->GetInternalToken()) {
+ return RequestCtx->GetInternalToken();
+ }
+
+ return Record.GetUserToken();
+ }
+
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
+ if (YdbParameters) {
+ return *YdbParameters;
+ }
+
+ return Record.GetRequest().GetYdbParameters();
+ }
+
+ Ydb::StatusIds::StatusCode GetYdbStatus() const {
+ return Record.GetYdbStatus();
+ }
+
+ ::Ydb::Table::QueryStatsCollection::Mode GetCollectStats() const {
+ if (RequestCtx) {
+ return CollectStats;
+ }
+
+ return Record.GetRequest().GetCollectStats();
+ }
+
+ const ::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& GetQueryIssues() const {
+ return Record.GetQueryIssues();
+ }
+
ui64 GetRequestSize() const {
return Record.GetRequest().ByteSizeLong();
}
@@ -322,18 +504,23 @@ struct TEvKqp {
}
}
- void PrepareRemote() const {
- if (RequestCtx) {
- Y_VERIFY(SerializerCb);
- SerializerCb(RequestCtx, &Record);
- RequestCtx.reset();
- }
- }
- mutable ui64 ParametersSize = 0;
+ void PrepareRemote() const;
+
mutable NKikimrKqp::TEvQueryRequest Record;
+
private:
+ mutable ui64 ParametersSize = 0;
mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx;
- TSerializerCb SerializerCb;
+ TString SessionId;
+ TString YqlText;
+ TString QueryId;
+ NKikimrKqp::EQueryAction QueryAction;
+ NKikimrKqp::EQueryType QueryType;
+ const ::Ydb::Table::TransactionControl* TxControl = nullptr;
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr;
+ const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
+ const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr;
+ const ::Ydb::Operations::OperationParams* OperationParams = nullptr;
};
struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest,
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
new file mode 100644
index 00000000000..f2368275526
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -0,0 +1,64 @@
+#include "kqp.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/util/proto_duration.h>
+
+namespace NKikimr::NKqp {
+
+void TEvKqp::TEvQueryRequest::PrepareRemote() const {
+ if (RequestCtx) {
+ if (RequestCtx->GetInternalToken()) {
+ Record.SetUserToken(RequestCtx->GetInternalToken());
+ }
+
+ Record.MutableRequest()->SetDatabase(
+ CanonizePath(RequestCtx->GetDatabaseName().GetOrElse("")));
+
+ if (auto traceId = RequestCtx->GetTraceId()) {
+ Record.SetTraceId(traceId.GetRef());
+ }
+
+ if (auto requestType = RequestCtx->GetRequestType()) {
+ Record.SetRequestType(requestType.GetRef());
+ }
+
+ if (TxControl) {
+ Record.MutableRequest()->MutableTxControl()->CopyFrom(*TxControl);
+ }
+
+ if (YdbParameters) {
+ Record.MutableRequest()->MutableYdbParameters()->insert(YdbParameters->begin(), YdbParameters->end());
+ }
+
+ if (QueryCachePolicy) {
+ Record.MutableRequest()->MutableQueryCachePolicy()->CopyFrom(*QueryCachePolicy);
+ }
+
+ if (CollectStats) {
+ Record.MutableRequest()->SetCollectStats(CollectStats);
+ }
+
+ if (!YqlText.empty()) {
+ Record.MutableRequest()->SetQuery(YqlText);
+ }
+
+ if (!QueryId.empty()) {
+ Record.MutableRequest()->SetPreparedQuery(QueryId);
+ }
+
+ Record.MutableRequest()->SetSessionId(SessionId);
+ Record.MutableRequest()->SetAction(QueryAction);
+ Record.MutableRequest()->SetType(QueryType);
+ if (OperationParams) {
+ const auto& operationTimeout = GetDuration(OperationParams->operation_timeout());
+ const auto& cancelAfter = GetDuration(OperationParams->cancel_after());
+
+ Record.MutableRequest()->SetCancelAfterMs(cancelAfter.MilliSeconds());
+ Record.MutableRequest()->SetTimeoutMs(operationTimeout.MilliSeconds());
+ }
+
+ RequestCtx.reset();
+ }
+}
+
+}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 01eccf30174..814220cc8c3 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -70,7 +70,7 @@ public:
struct TKqpQueryState {
TActorId Sender;
ui64 ProxyRequestId = 0;
- NKikimrKqp::TQueryRequest RequestProto;
+ std::unique_ptr<TEvKqp::TEvQueryRequest> RequestEv;
ui64 ParametersSize = 0;
TPreparedQueryHolder::TConstPtr PreparedQuery;
TKqpCompileResult::TConstPtr CompileResult;
@@ -103,54 +103,58 @@ struct TKqpQueryState {
TDuration CpuTime;
std::optional<NCpuTime::TCpuTimer> CurrentTimer;
+ const NKikimrKqp::TQueryRequest& RequestProto() const {
+ return RequestEv->Record.GetRequest();
+ }
+
NKikimrKqp::EQueryAction GetAction() const {
- return RequestProto.GetAction();
+ return RequestEv->GetAction();
}
bool GetKeepSession() const {
- return RequestProto.GetKeepSession();
+ return RequestProto().GetKeepSession();
}
const TString& GetQuery() const {
- return RequestProto.GetQuery();
+ return RequestEv->GetQuery();
}
const TString& GetPreparedQuery() const {
- return RequestProto.GetPreparedQuery();
+ return RequestEv->GetPreparedQuery();
}
NKikimrKqp::EQueryType GetType() const {
- return RequestProto.HasType() ? RequestProto.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED;
+ return RequestEv->GetType();
}
void EnsureAction() {
- YQL_ENSURE(RequestProto.HasAction());
+ YQL_ENSURE(RequestEv->HasAction());
}
bool GetUsePublicResponseDataFormat() const {
- return RequestProto.GetUsePublicResponseDataFormat();
+ return RequestEv->GetUsePublicResponseDataFormat();
}
void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) {
auto now = TAppData::TimeProvider->Now();
- if (RequestProto.GetCancelAfterMs()) {
- QueryDeadlines.CancelAt = now + TDuration::MilliSeconds(RequestProto.GetCancelAfterMs());
+ if (RequestProto().GetCancelAfterMs()) {
+ QueryDeadlines.CancelAt = now + TDuration::MilliSeconds(RequestProto().GetCancelAfterMs());
}
- auto timeoutMs = GetQueryTimeout(GetType(), RequestProto.GetTimeoutMs(), service);
+ auto timeoutMs = GetQueryTimeout(GetType(), RequestProto().GetTimeoutMs(), service);
QueryDeadlines.TimeoutAt = now + timeoutMs;
}
bool HasTopicOperations() const {
- return RequestProto.HasTopicOperations();
+ return RequestEv->HasTopicOperations();
}
const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const {
- return RequestProto.GetQueryCachePolicy();
+ return RequestEv->GetQueryCachePolicy();
}
const TString& GetDatabase() const {
- return RequestProto.GetDatabase();
+ return RequestProto().GetDatabase();
}
TString ExtractQueryText() const {
@@ -160,11 +164,11 @@ struct TKqpQueryState {
}
return {};
}
- return RequestProto.GetQuery();
+ return RequestProto().GetQuery();
}
const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
- return RequestProto.GetTopicOperations();
+ return RequestProto().GetTopicOperations();
}
bool NeedPersistentSnapshot() const {
@@ -176,31 +180,31 @@ struct TKqpQueryState {
}
bool HasTxControl() const {
- return RequestProto.HasTxControl();
+ return RequestEv->HasTxControl();
}
const ::Ydb::Table::TransactionControl& GetTxControl() const {
- return RequestProto.GetTxControl();
+ return RequestEv->GetTxControl();
}
const ::NKikimrMiniKQL::TParams& GetParameters() const {
- return RequestProto.GetParameters();
+ return RequestEv->GetParameters();
}
const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
- return RequestProto.GetYdbParameters();
+ return RequestEv->GetYdbParameters();
}
Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const {
- if (!RequestProto.HasCollectStats()) {
+ if (!RequestProto().HasCollectStats()) {
return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
}
- if (RequestProto.GetCollectStats() == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) {
+ if (RequestProto().GetCollectStats() == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) {
return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
}
- return RequestProto.GetCollectStats();
+ return RequestProto().GetCollectStats();
}
bool CollectStatsDefined() const {
@@ -208,7 +212,7 @@ struct TKqpQueryState {
}
bool HasPreparedQuery() const {
- return RequestProto.HasPreparedQuery();
+ return RequestProto().HasPreparedQuery();
}
bool IsStreamResult() const {
@@ -316,19 +320,39 @@ public:
++QueryId;
YQL_ENSURE(!QueryState);
QueryState = std::make_shared<TKqpQueryState>();
- QueryState->RequestProto.Swap(ev->Get()->Record.MutableRequest());
+ QueryState->Sender = ev->Sender;
+ QueryState->ProxyRequestId = ev->Cookie;
+ QueryState->TraceId = ev->Get()->GetTraceId();
+ QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(ev->Get()->GetRequestType());
+ QueryState->StartTime = TInstant::Now();
+ QueryState->UserToken = ev->Get()->GetUserToken();
+ QueryState->ParametersSize = ev->Get()->GetParametersSize();
+ QueryState->RequestActorId = ev->Get()->GetRequestActorId();
+ auto selfId = SelfId();
+ auto as = TActivationContext::ActorSystem();
+ ev->Get()->SetClientLostAction(selfId, as);
+ QueryState->RequestEv.reset(ev->Release().Release());
+ QueryState->KeepSession = Settings.LongSession || QueryState->GetKeepSession();
+ QueryState->SetQueryDeadlines(Settings.Service);
+ auto action = QueryState->GetAction();
+
+ NWilson::TTraceId id;
+ if (false) { // change to enable Wilson tracing
+ id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>());
+ LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId()));
+ }
+ QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
}
- bool ConvertParameters(TEvKqp::TEvQueryRequest::TPtr& ev) {
- ui64 proxyRequestId = ev->Cookie;
- auto& event = ev->Get()->Record;
+ bool ConvertParameters() {
+ auto& event = QueryState->RequestEv->Record;
if (!event.GetRequest().HasParameters() && event.GetRequest().YdbParametersSize()) {
try {
ConvertYdbParamsToMiniKQLParams(event.GetRequest().GetYdbParameters(), *event.MutableRequest()->MutableParameters());
} catch (const std::exception& ex) {
TString message = TStringBuilder() << "Failed to parse query parameters. "<< ex.what();
- ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_REQUEST, message);
+ ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, Ydb::StatusIds::BAD_REQUEST, message);
return false;
}
}
@@ -337,9 +361,7 @@ public:
}
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
- ev->Get()->Record.MutableRequest()->Swap(&QueryState->RequestProto);
-
- if (!ConvertParameters(ev))
+ if (!ConvertParameters())
return;
if (!WorkerId) {
@@ -347,7 +369,7 @@ public:
ModuleResolverState, Counters));
WorkerId = RegisterWithSameMailbox(workerActor.release());
}
- TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), ev->Release().Release(), ev->Flags, ev->Cookie,
+ TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie,
nullptr, std::move(ev->TraceId)));
Become(&TKqpSessionActor::ExecuteState);
}
@@ -443,20 +465,19 @@ public:
YQL_ENSURE(event.GetRequest().GetSessionId() == SessionId,
"Invalid session, expected: " << SessionId << ", got: " << event.GetRequest().GetSessionId());
- if (event.HasYdbStatus()) {
- if (event.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(event.GetQueryIssues(), issues);
- TString errMsg = issues.ToString();
-
- LOG_N(TKqpRequestInfo("", SessionId)
- << "Got invalid query request, reply with status: "
- << event.GetYdbStatus()
- << " msg: "
- << errMsg <<".");
- ReplyProcessError(ev->Sender, proxyRequestId, event.GetYdbStatus(), errMsg);
- return;
- }
+ if (ev->Get()->HasYdbStatus() && ev->Get()->GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(ev->Get()->GetQueryIssues(), issues);
+ TString errMsg = issues.ToString();
+ auto status = ev->Get()->GetYdbStatus();
+
+ LOG_N(TKqpRequestInfo("", SessionId)
+ << "Got invalid query request, reply with status: "
+ << status
+ << " msg: "
+ << errMsg <<".");
+ ReplyProcessError(ev->Sender, proxyRequestId, status, errMsg);
+ return;
}
if (ShutdownState && ShutdownState->SoftTimeoutReached()) {
@@ -475,13 +496,6 @@ public:
QueryState->EnsureAction();
auto action = QueryState->GetAction();
- NWilson::TTraceId id;
- if (false) { // change to enable Wilson tracing
- id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>());
- LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId()));
- }
- QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
-
LWTRACK(KqpSessionQueryRequest,
QueryState->Orbit,
QueryState->GetDatabase(),
@@ -489,14 +503,6 @@ public:
action,
QueryState->GetQuery());
- QueryState->Sender = ev->Sender;
- QueryState->ProxyRequestId = proxyRequestId;
- QueryState->TraceId = event.GetTraceId();
- QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(event.GetRequestType());
- QueryState->StartTime = TInstant::Now();
- QueryState->UserToken = event.GetUserToken();
- QueryState->SetQueryDeadlines(Settings.Service);
-
LOG_D("received request,"
<< " proxyRequestId: " << proxyRequestId
<< " prepared: " << QueryState->HasPreparedQuery()
@@ -506,14 +512,6 @@ public:
<< " text: " << QueryState->GetQuery()
);
- QueryState->ParametersSize = ev->Get()->GetParametersSize();
- QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId());
- QueryState->KeepSession = Settings.LongSession || QueryState->GetKeepSession();
-
- auto selfId = SelfId();
- auto as = TActivationContext::ActorSystem();
- ev->Get()->SetClientLostAction(selfId, as);
-
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
case NKikimrKqp::QUERY_ACTION_PREPARE: