diff options
author | gvit <gvit@ydb.tech> | 2023-02-14 15:16:12 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-02-14 15:16:12 +0300 |
commit | 11b72c5b82735d8c975d1a077877123ca4989589 (patch) | |
tree | eac942a5dff39700067c71fdf653ac8c21cff9eb | |
parent | 4f395457741333ecbc606ef3ef91606181d7c2f2 (diff) | |
download | ydb-11b72c5b82735d8c975d1a077877123ca4989589.tar.gz |
refactor session actor
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_data_query.cpp | 104 | ||||
-rw-r--r-- | ydb/core/kqp/common/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 211 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_event_impl.cpp | 64 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 138 |
7 files changed, 366 insertions, 154 deletions
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index a71133cb646..a7eab0d1763 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -25,77 +25,6 @@ using namespace NKqp; using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse>; -void SerializeQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& in, NKikimrKqp::TEvQueryRequest* dst) noexcept { - auto req = TEvExecuteDataQueryRequest::GetProtoRequest(in); - - SetAuthToken(*dst, *in.get()); - SetDatabase(*dst, *in.get()); - - dst->MutableRequest()->SetSessionId(req->session_id()); - dst->MutableRequest()->SetUsePublicResponseDataFormat(true); - - if (auto traceId = in->GetTraceId()) { - dst->SetTraceId(traceId.GetRef()); - } - - if (auto requestType = in->GetRequestType()) { - dst->SetRequestType(requestType.GetRef()); - } - - const auto& operationParams = req->operation_params(); - const auto& operationTimeout = GetDuration(operationParams.operation_timeout()); - const auto& cancelAfter = GetDuration(operationParams.cancel_after()); - - dst->MutableRequest()->SetCancelAfterMs(cancelAfter.MilliSeconds()); - dst->MutableRequest()->SetTimeoutMs(operationTimeout.MilliSeconds()); - - dst->MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control()); - dst->MutableRequest()->MutableQueryCachePolicy()->CopyFrom(req->query_cache_policy()); - dst->MutableRequest()->SetStatsMode(GetKqpStatsMode(req->collect_stats())); - dst->MutableRequest()->SetCollectStats(req->collect_stats()); - - const auto& query = req->query(); - - switch (query.query_case()) { - case Query::kYqlText: { - dst->MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - dst->MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - dst->MutableRequest()->SetQuery(query.yql_text()); - break; - } - - case Query::kId: { - dst->MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED); - dst->MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_PREPARED_DML); - - TString preparedQueryId; - try { - preparedQueryId = DecodePreparedQueryId(query.id()); - } catch (const std::exception& ex) { - NYql::TIssues issues; - issues.AddIssue(NYql::ExceptionToIssue(ex)); - - dst->SetYdbStatus(Ydb::StatusIds::BAD_REQUEST); - NYql::IssuesToMessage(issues, dst->MutableQueryIssues()); - return; - } - - dst->MutableRequest()->SetPreparedQuery(preparedQueryId); - break; - } - - default: { - NYql::TIssues issues; - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); - dst->SetYdbStatus(Ydb::StatusIds::BAD_REQUEST); - NYql::IssuesToMessage(issues, dst->MutableQueryIssues()); - return; - } - } - - dst->MutableRequest()->MutableYdbParameters()->insert(req->parameters().begin(), req->parameters().end()); -} - class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest> { using TBase = TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest>; @@ -153,6 +82,10 @@ public: } auto& query = req->query(); + TString yqlText; + TString queryId; + NKikimrKqp::EQueryAction queryAction; + NKikimrKqp::EQueryType queryType; switch (query.query_case()) { case Query::kYqlText: { @@ -160,6 +93,9 @@ public: if (!CheckQuery(query.yql_text(), issues)) { return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); } + queryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; + queryType = NKikimrKqp::QUERY_TYPE_SQL_DML; + yqlText = query.yql_text(); break; } @@ -169,6 +105,18 @@ public: issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty query id")); return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); } + + try { + queryId = DecodePreparedQueryId(query.id()); + } catch (const std::exception& ex) { + NYql::TIssues issues; + issues.AddIssue(NYql::ExceptionToIssue(ex)); + return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); + return; + } + + queryAction = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED; + queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML; break; } @@ -179,7 +127,19 @@ public: } } - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(Request_, SerializeQueryRequest, SelfId()); + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( + Request_, + req->session_id(), + SelfId(), + std::move(yqlText), + std::move(queryId), + queryAction, + queryType, + &req->tx_control(), + &req->parameters(), + req->collect_stats(), + &req->query_cache_policy(), + &req->operation_params()); ev->PrepareRemote(); ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt index 7ffd2b4be82..b7c74e48b60 100644 --- a/ydb/core/kqp/common/CMakeLists.darwin.txt +++ b/ydb/core/kqp/common/CMakeLists.darwin.txt @@ -29,6 +29,7 @@ target_link_libraries(core-kqp-common PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_event_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt index b669a94abbc..aa4bfc498cc 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt @@ -30,6 +30,7 @@ target_link_libraries(core-kqp-common PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_event_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt index b669a94abbc..aa4bfc498cc 100644 --- a/ydb/core/kqp/common/CMakeLists.linux.txt +++ b/ydb/core/kqp/common/CMakeLists.linux.txt @@ -30,6 +30,7 @@ target_link_libraries(core-kqp-common PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_event_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index e4161d225a6..477ae39811b 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -254,10 +254,30 @@ struct TEvKqp { struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> { public: - using TSerializerCb = void (*)(std::shared_ptr<NGRpcService::IRequestCtxMtSafe>&, NKikimrKqp::TEvQueryRequest*) noexcept; - TEvQueryRequest(std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, TSerializerCb cb, TActorId actorId) + TEvQueryRequest( + std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, + const TString& sessionId, + TActorId actorId, + TString&& yqlText, + TString&& queryId, + NKikimrKqp::EQueryAction queryAction, + NKikimrKqp::EQueryType queryType, + const ::Ydb::Table::TransactionControl* txControl, + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters, + const ::Ydb::Table::QueryStatsCollection::Mode collectStats, + const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, + const ::Ydb::Operations::OperationParams* operationParams) : RequestCtx(ctx) - , SerializerCb(cb) + , SessionId(sessionId) + , YqlText(std::move(yqlText)) + , QueryId(std::move(queryId)) + , QueryAction(queryAction) + , QueryType(queryType) + , TxControl(txControl) + , YdbParameters(ydbParameters) + , CollectStats(collectStats) + , QueryCachePolicy(queryCachePolicy) + , OperationParams(operationParams) { ActorIdToProto(actorId, Record.MutableCancelationActor()); } @@ -272,6 +292,168 @@ struct TEvKqp { // Same as TEventPBBase but without Rope (but can contain Payload and will lose some data after all) TEventSerializationInfo CreateSerializationInfo() const override { return {}; } + bool HasYdbStatus() const { + if (RequestCtx) { + return false; + } + + return Record.HasYdbStatus(); + } + + bool HasTopicOperations() const { + return Record.GetRequest().HasTopicOperations(); + } + + bool HasAction() const { + if (RequestCtx) { + // passed directly to constructor. + return true; + } else { + return Record.GetRequest().HasAction(); + } + } + + const TString& GetSessionId() const { + if (RequestCtx) { + return SessionId; + } + + return Record.GetRequest().GetSessionId(); + } + + NKikimrKqp::EQueryAction GetAction() const { + if (RequestCtx) { + return QueryAction; + } + + return Record.GetRequest().GetAction(); + + } + + NKikimrKqp::EQueryType GetType() const { + if (RequestCtx) { + return QueryType; + } + + return Record.GetRequest().GetType(); + } + + bool HasPreparedQuery() const { + if (!QueryId.empty()) { + return true; + } + + return Record.GetRequest().HasPreparedQuery(); + } + + const TString& GetPreparedQuery() const { + if (!QueryId.empty()) { + return QueryId; + } + + return Record.GetRequest().GetPreparedQuery(); + } + + const TString& GetQuery() const { + if (!YqlText.empty()) { + return YqlText; + } + + return Record.GetRequest().GetQuery(); + } + + const ::NKikimrMiniKQL::TParams& GetParameters() const { + return Record.GetRequest().GetParameters(); + } + + const ::Ydb::Table::TransactionControl& GetTxControl() const { + if (TxControl) { + return *TxControl; + } + + return Record.GetRequest().GetTxControl(); + } + + bool GetUsePublicResponseDataFormat() const { + if (RequestCtx) { + return true; + } + + return Record.GetRequest().GetUsePublicResponseDataFormat(); + } + + const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const { + if (QueryCachePolicy) { + return *QueryCachePolicy; + } + + return Record.GetRequest().GetQueryCachePolicy(); + } + + bool HasTxControl() const { + if (TxControl) { + return true; + } + + return Record.GetRequest().HasTxControl(); + } + + TActorId GetRequestActorId() const { + return ActorIdFromProto(Record.GetRequestActorId()); + } + + const TString& GetTraceId() const { + if (RequestCtx) { + if (auto traceId = RequestCtx->GetTraceId()) { + return traceId.GetRef(); + } + } + + return Record.GetTraceId(); + } + + const TString& GetRequestType() const { + if (RequestCtx) { + if (auto requestType = RequestCtx->GetRequestType()) { + return requestType.GetRef(); + } + } + + return Record.GetRequestType(); + } + + const TString& GetUserToken() const { + if (RequestCtx && RequestCtx->GetInternalToken()) { + return RequestCtx->GetInternalToken(); + } + + return Record.GetUserToken(); + } + + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const { + if (YdbParameters) { + return *YdbParameters; + } + + return Record.GetRequest().GetYdbParameters(); + } + + Ydb::StatusIds::StatusCode GetYdbStatus() const { + return Record.GetYdbStatus(); + } + + ::Ydb::Table::QueryStatsCollection::Mode GetCollectStats() const { + if (RequestCtx) { + return CollectStats; + } + + return Record.GetRequest().GetCollectStats(); + } + + const ::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& GetQueryIssues() const { + return Record.GetQueryIssues(); + } + ui64 GetRequestSize() const { return Record.GetRequest().ByteSizeLong(); } @@ -322,18 +504,23 @@ struct TEvKqp { } } - void PrepareRemote() const { - if (RequestCtx) { - Y_VERIFY(SerializerCb); - SerializerCb(RequestCtx, &Record); - RequestCtx.reset(); - } - } - mutable ui64 ParametersSize = 0; + void PrepareRemote() const; + mutable NKikimrKqp::TEvQueryRequest Record; + private: + mutable ui64 ParametersSize = 0; mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx; - TSerializerCb SerializerCb; + TString SessionId; + TString YqlText; + TString QueryId; + NKikimrKqp::EQueryAction QueryAction; + NKikimrKqp::EQueryType QueryType; + const ::Ydb::Table::TransactionControl* TxControl = nullptr; + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr; + const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; + const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr; + const ::Ydb::Operations::OperationParams* OperationParams = nullptr; }; struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest, diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp new file mode 100644 index 00000000000..f2368275526 --- /dev/null +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -0,0 +1,64 @@ +#include "kqp.h" + +#include <ydb/core/base/path.h> +#include <ydb/core/util/proto_duration.h> + +namespace NKikimr::NKqp { + +void TEvKqp::TEvQueryRequest::PrepareRemote() const { + if (RequestCtx) { + if (RequestCtx->GetInternalToken()) { + Record.SetUserToken(RequestCtx->GetInternalToken()); + } + + Record.MutableRequest()->SetDatabase( + CanonizePath(RequestCtx->GetDatabaseName().GetOrElse(""))); + + if (auto traceId = RequestCtx->GetTraceId()) { + Record.SetTraceId(traceId.GetRef()); + } + + if (auto requestType = RequestCtx->GetRequestType()) { + Record.SetRequestType(requestType.GetRef()); + } + + if (TxControl) { + Record.MutableRequest()->MutableTxControl()->CopyFrom(*TxControl); + } + + if (YdbParameters) { + Record.MutableRequest()->MutableYdbParameters()->insert(YdbParameters->begin(), YdbParameters->end()); + } + + if (QueryCachePolicy) { + Record.MutableRequest()->MutableQueryCachePolicy()->CopyFrom(*QueryCachePolicy); + } + + if (CollectStats) { + Record.MutableRequest()->SetCollectStats(CollectStats); + } + + if (!YqlText.empty()) { + Record.MutableRequest()->SetQuery(YqlText); + } + + if (!QueryId.empty()) { + Record.MutableRequest()->SetPreparedQuery(QueryId); + } + + Record.MutableRequest()->SetSessionId(SessionId); + Record.MutableRequest()->SetAction(QueryAction); + Record.MutableRequest()->SetType(QueryType); + if (OperationParams) { + const auto& operationTimeout = GetDuration(OperationParams->operation_timeout()); + const auto& cancelAfter = GetDuration(OperationParams->cancel_after()); + + Record.MutableRequest()->SetCancelAfterMs(cancelAfter.MilliSeconds()); + Record.MutableRequest()->SetTimeoutMs(operationTimeout.MilliSeconds()); + } + + RequestCtx.reset(); + } +} + +} diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 01eccf30174..814220cc8c3 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -70,7 +70,7 @@ public: struct TKqpQueryState { TActorId Sender; ui64 ProxyRequestId = 0; - NKikimrKqp::TQueryRequest RequestProto; + std::unique_ptr<TEvKqp::TEvQueryRequest> RequestEv; ui64 ParametersSize = 0; TPreparedQueryHolder::TConstPtr PreparedQuery; TKqpCompileResult::TConstPtr CompileResult; @@ -103,54 +103,58 @@ struct TKqpQueryState { TDuration CpuTime; std::optional<NCpuTime::TCpuTimer> CurrentTimer; + const NKikimrKqp::TQueryRequest& RequestProto() const { + return RequestEv->Record.GetRequest(); + } + NKikimrKqp::EQueryAction GetAction() const { - return RequestProto.GetAction(); + return RequestEv->GetAction(); } bool GetKeepSession() const { - return RequestProto.GetKeepSession(); + return RequestProto().GetKeepSession(); } const TString& GetQuery() const { - return RequestProto.GetQuery(); + return RequestEv->GetQuery(); } const TString& GetPreparedQuery() const { - return RequestProto.GetPreparedQuery(); + return RequestEv->GetPreparedQuery(); } NKikimrKqp::EQueryType GetType() const { - return RequestProto.HasType() ? RequestProto.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED; + return RequestEv->GetType(); } void EnsureAction() { - YQL_ENSURE(RequestProto.HasAction()); + YQL_ENSURE(RequestEv->HasAction()); } bool GetUsePublicResponseDataFormat() const { - return RequestProto.GetUsePublicResponseDataFormat(); + return RequestEv->GetUsePublicResponseDataFormat(); } void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) { auto now = TAppData::TimeProvider->Now(); - if (RequestProto.GetCancelAfterMs()) { - QueryDeadlines.CancelAt = now + TDuration::MilliSeconds(RequestProto.GetCancelAfterMs()); + if (RequestProto().GetCancelAfterMs()) { + QueryDeadlines.CancelAt = now + TDuration::MilliSeconds(RequestProto().GetCancelAfterMs()); } - auto timeoutMs = GetQueryTimeout(GetType(), RequestProto.GetTimeoutMs(), service); + auto timeoutMs = GetQueryTimeout(GetType(), RequestProto().GetTimeoutMs(), service); QueryDeadlines.TimeoutAt = now + timeoutMs; } bool HasTopicOperations() const { - return RequestProto.HasTopicOperations(); + return RequestEv->HasTopicOperations(); } const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const { - return RequestProto.GetQueryCachePolicy(); + return RequestEv->GetQueryCachePolicy(); } const TString& GetDatabase() const { - return RequestProto.GetDatabase(); + return RequestProto().GetDatabase(); } TString ExtractQueryText() const { @@ -160,11 +164,11 @@ struct TKqpQueryState { } return {}; } - return RequestProto.GetQuery(); + return RequestProto().GetQuery(); } const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { - return RequestProto.GetTopicOperations(); + return RequestProto().GetTopicOperations(); } bool NeedPersistentSnapshot() const { @@ -176,31 +180,31 @@ struct TKqpQueryState { } bool HasTxControl() const { - return RequestProto.HasTxControl(); + return RequestEv->HasTxControl(); } const ::Ydb::Table::TransactionControl& GetTxControl() const { - return RequestProto.GetTxControl(); + return RequestEv->GetTxControl(); } const ::NKikimrMiniKQL::TParams& GetParameters() const { - return RequestProto.GetParameters(); + return RequestEv->GetParameters(); } const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const { - return RequestProto.GetYdbParameters(); + return RequestEv->GetYdbParameters(); } Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const { - if (!RequestProto.HasCollectStats()) { + if (!RequestProto().HasCollectStats()) { return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; } - if (RequestProto.GetCollectStats() == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) { + if (RequestProto().GetCollectStats() == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) { return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; } - return RequestProto.GetCollectStats(); + return RequestProto().GetCollectStats(); } bool CollectStatsDefined() const { @@ -208,7 +212,7 @@ struct TKqpQueryState { } bool HasPreparedQuery() const { - return RequestProto.HasPreparedQuery(); + return RequestProto().HasPreparedQuery(); } bool IsStreamResult() const { @@ -316,19 +320,39 @@ public: ++QueryId; YQL_ENSURE(!QueryState); QueryState = std::make_shared<TKqpQueryState>(); - QueryState->RequestProto.Swap(ev->Get()->Record.MutableRequest()); + QueryState->Sender = ev->Sender; + QueryState->ProxyRequestId = ev->Cookie; + QueryState->TraceId = ev->Get()->GetTraceId(); + QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(ev->Get()->GetRequestType()); + QueryState->StartTime = TInstant::Now(); + QueryState->UserToken = ev->Get()->GetUserToken(); + QueryState->ParametersSize = ev->Get()->GetParametersSize(); + QueryState->RequestActorId = ev->Get()->GetRequestActorId(); + auto selfId = SelfId(); + auto as = TActivationContext::ActorSystem(); + ev->Get()->SetClientLostAction(selfId, as); + QueryState->RequestEv.reset(ev->Release().Release()); + QueryState->KeepSession = Settings.LongSession || QueryState->GetKeepSession(); + QueryState->SetQueryDeadlines(Settings.Service); + auto action = QueryState->GetAction(); + + NWilson::TTraceId id; + if (false) { // change to enable Wilson tracing + id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>()); + LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId())); + } + QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); } - bool ConvertParameters(TEvKqp::TEvQueryRequest::TPtr& ev) { - ui64 proxyRequestId = ev->Cookie; - auto& event = ev->Get()->Record; + bool ConvertParameters() { + auto& event = QueryState->RequestEv->Record; if (!event.GetRequest().HasParameters() && event.GetRequest().YdbParametersSize()) { try { ConvertYdbParamsToMiniKQLParams(event.GetRequest().GetYdbParameters(), *event.MutableRequest()->MutableParameters()); } catch (const std::exception& ex) { TString message = TStringBuilder() << "Failed to parse query parameters. "<< ex.what(); - ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_REQUEST, message); + ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, Ydb::StatusIds::BAD_REQUEST, message); return false; } } @@ -337,9 +361,7 @@ public: } void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { - ev->Get()->Record.MutableRequest()->Swap(&QueryState->RequestProto); - - if (!ConvertParameters(ev)) + if (!ConvertParameters()) return; if (!WorkerId) { @@ -347,7 +369,7 @@ public: ModuleResolverState, Counters)); WorkerId = RegisterWithSameMailbox(workerActor.release()); } - TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), ev->Release().Release(), ev->Flags, ev->Cookie, + TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie, nullptr, std::move(ev->TraceId))); Become(&TKqpSessionActor::ExecuteState); } @@ -443,20 +465,19 @@ public: YQL_ENSURE(event.GetRequest().GetSessionId() == SessionId, "Invalid session, expected: " << SessionId << ", got: " << event.GetRequest().GetSessionId()); - if (event.HasYdbStatus()) { - if (event.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - NYql::TIssues issues; - NYql::IssuesFromMessage(event.GetQueryIssues(), issues); - TString errMsg = issues.ToString(); - - LOG_N(TKqpRequestInfo("", SessionId) - << "Got invalid query request, reply with status: " - << event.GetYdbStatus() - << " msg: " - << errMsg <<"."); - ReplyProcessError(ev->Sender, proxyRequestId, event.GetYdbStatus(), errMsg); - return; - } + if (ev->Get()->HasYdbStatus() && ev->Get()->GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + NYql::TIssues issues; + NYql::IssuesFromMessage(ev->Get()->GetQueryIssues(), issues); + TString errMsg = issues.ToString(); + auto status = ev->Get()->GetYdbStatus(); + + LOG_N(TKqpRequestInfo("", SessionId) + << "Got invalid query request, reply with status: " + << status + << " msg: " + << errMsg <<"."); + ReplyProcessError(ev->Sender, proxyRequestId, status, errMsg); + return; } if (ShutdownState && ShutdownState->SoftTimeoutReached()) { @@ -475,13 +496,6 @@ public: QueryState->EnsureAction(); auto action = QueryState->GetAction(); - NWilson::TTraceId id; - if (false) { // change to enable Wilson tracing - id = NWilson::TTraceId::NewTraceId(TWilsonKqp::KqpSession, Max<ui32>()); - LOG_I("wilson tracing started, id: " + std::to_string(id.GetTraceId())); - } - QueryState->KqpSessionSpan = NWilson::TSpan(TWilsonKqp::KqpSession, std::move(id), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); - LWTRACK(KqpSessionQueryRequest, QueryState->Orbit, QueryState->GetDatabase(), @@ -489,14 +503,6 @@ public: action, QueryState->GetQuery()); - QueryState->Sender = ev->Sender; - QueryState->ProxyRequestId = proxyRequestId; - QueryState->TraceId = event.GetTraceId(); - QueryState->IsDocumentApiRestricted = IsDocumentApiRestricted(event.GetRequestType()); - QueryState->StartTime = TInstant::Now(); - QueryState->UserToken = event.GetUserToken(); - QueryState->SetQueryDeadlines(Settings.Service); - LOG_D("received request," << " proxyRequestId: " << proxyRequestId << " prepared: " << QueryState->HasPreparedQuery() @@ -506,14 +512,6 @@ public: << " text: " << QueryState->GetQuery() ); - QueryState->ParametersSize = ev->Get()->GetParametersSize(); - QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId()); - QueryState->KeepSession = Settings.LongSession || QueryState->GetKeepSession(); - - auto selfId = SelfId(); - auto as = TActivationContext::ActorSystem(); - ev->Get()->SetClientLostAction(selfId, as); - switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: case NKikimrKqp::QUERY_ACTION_PREPARE: |