diff options
author | gvit <gvit@ydb.tech> | 2023-02-21 14:59:22 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-02-21 14:59:22 +0300 |
commit | edb2549e05cf120e034386d1b1664947195ccf6c (patch) | |
tree | e83dde458964144eaf097ba4db1724ec061e0074 | |
parent | b2ea1197cdc3ea11737b036349f91986e3a90743 (diff) | |
download | ydb-edb2549e05cf120e034386d1b1664947195ccf6c.tar.gz |
avoid copies into proto
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_data_query.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 143 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_event_impl.cpp | 45 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 45 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 50 |
7 files changed, 150 insertions, 146 deletions
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 170dd68c95..a3dbf019bb 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -138,9 +138,8 @@ public: &req->tx_control(), &req->parameters(), req->collect_stats(), - &req->query_cache_policy(), - &req->operation_params()); - ev->PrepareRemote(); + req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr, + req->has_operation_params() ? &req->operation_params() : nullptr); ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 09d7ae7220..cdd7119ae5 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -256,7 +256,7 @@ struct TEvKqp { struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> { public: TEvQueryRequest( - std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx, + const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx, const TString& sessionId, TActorId actorId, TString&& yqlText, @@ -267,22 +267,8 @@ struct TEvKqp { const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters, const ::Ydb::Table::QueryStatsCollection::Mode collectStats, const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, - const ::Ydb::Operations::OperationParams* operationParams) - : RequestCtx(ctx) - , SessionId(sessionId) - , YqlText(std::move(yqlText)) - , QueryId(std::move(queryId)) - , QueryAction(queryAction) - , QueryType(queryType) - , TxControl(txControl) - , YdbParameters(ydbParameters) - , CollectStats(collectStats) - , QueryCachePolicy(queryCachePolicy) - , OperationParams(operationParams) - { - ActorIdToProto(actorId, Record.MutableCancelationActor()); - } - + const ::Ydb::Operations::OperationParams* operationParams, + bool keepSession = false); TEvQueryRequest() = default; @@ -290,77 +276,70 @@ struct TEvKqp { return true; } - // Same as TEventPBBase but without Rope (but can contain Payload and will lose some data after all) TEventSerializationInfo CreateSerializationInfo() const override { return {}; } + const TString& GetDatabase() const { + return RequestCtx ? Database : Record.GetRequest().GetDatabase(); + } + bool HasYdbStatus() const { - if (RequestCtx) { - return false; - } + return RequestCtx ? false : Record.HasYdbStatus(); + } - return Record.HasYdbStatus(); + const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { + return Record.GetRequest().GetTopicOperations(); } bool HasTopicOperations() const { return Record.GetRequest().HasTopicOperations(); } + bool GetKeepSession() const { + return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession(); + } + + TDuration GetCancelAfter() const { + return RequestCtx ? CancelAfter : TDuration::MilliSeconds(Record.GetRequest().GetCancelAfterMs()); + } + + TDuration GetOperationTimeout() const { + return RequestCtx ? OperationTimeout : TDuration::MilliSeconds(Record.GetRequest().GetTimeoutMs()); + } + bool HasAction() const { + return RequestCtx ? true : Record.GetRequest().HasAction(); + } + + void SetSessionId(const TString& sessionId) { if (RequestCtx) { - // passed directly to constructor. - return true; + SessionId = sessionId; } else { - return Record.GetRequest().HasAction(); + Record.MutableRequest()->SetSessionId(sessionId); } } const TString& GetSessionId() const { - if (RequestCtx) { - return SessionId; - } - - return Record.GetRequest().GetSessionId(); + return RequestCtx ? SessionId : Record.GetRequest().GetSessionId(); } NKikimrKqp::EQueryAction GetAction() const { - if (RequestCtx) { - return QueryAction; - } - - return Record.GetRequest().GetAction(); - + return RequestCtx ? QueryAction : Record.GetRequest().GetAction(); } NKikimrKqp::EQueryType GetType() const { - if (RequestCtx) { - return QueryType; - } - - return Record.GetRequest().GetType(); + return RequestCtx ? QueryType : Record.GetRequest().GetType(); } bool HasPreparedQuery() const { - if (!QueryId.empty()) { - return true; - } - - return Record.GetRequest().HasPreparedQuery(); + return RequestCtx ? QueryId.size() > 0 : Record.GetRequest().HasPreparedQuery(); } const TString& GetPreparedQuery() const { - if (!QueryId.empty()) { - return QueryId; - } - - return Record.GetRequest().GetPreparedQuery(); + return RequestCtx ? QueryId : Record.GetRequest().GetPreparedQuery(); } const TString& GetQuery() const { - if (!YqlText.empty()) { - return YqlText; - } - - return Record.GetRequest().GetQuery(); + return RequestCtx ? YqlText : Record.GetRequest().GetQuery(); } const ::NKikimrMiniKQL::TParams& GetParameters() const { @@ -368,35 +347,29 @@ struct TEvKqp { } const ::Ydb::Table::TransactionControl& GetTxControl() const { - if (TxControl) { - return *TxControl; - } - - return Record.GetRequest().GetTxControl(); + return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl(); } bool GetUsePublicResponseDataFormat() const { - if (RequestCtx) { - return true; - } - - return Record.GetRequest().GetUsePublicResponseDataFormat(); + return RequestCtx ? true : Record.GetRequest().GetUsePublicResponseDataFormat(); } - const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const { - if (QueryCachePolicy) { - return *QueryCachePolicy; + bool GetQueryKeepInCache() const { + if (RequestCtx) { + if (QueryCachePolicy != nullptr) { + return QueryCachePolicy->keep_in_cache(); + } + return false; } - - return Record.GetRequest().GetQueryCachePolicy(); + return Record.GetRequest().GetQueryCachePolicy().keep_in_cache(); } bool HasTxControl() const { - if (TxControl) { - return true; - } + return RequestCtx ? TxControl != nullptr : Record.GetRequest().HasTxControl(); + } - return Record.GetRequest().HasTxControl(); + bool HasCollectStats() const { + return RequestCtx ? true : Record.GetRequest().HasCollectStats(); } TActorId GetRequestActorId() const { @@ -405,9 +378,10 @@ struct TEvKqp { const TString& GetTraceId() const { if (RequestCtx) { - if (auto traceId = RequestCtx->GetTraceId()) { - return traceId.GetRef(); + if (!TraceId) { + TraceId = RequestCtx->GetTraceId().GetOrElse(""); } + return TraceId; } return Record.GetTraceId(); @@ -415,9 +389,10 @@ struct TEvKqp { const TString& GetRequestType() const { if (RequestCtx) { - if (auto requestType = RequestCtx->GetRequestType()) { - return requestType.GetRef(); + if (!RequestType) { + RequestType = RequestCtx->GetRequestType().GetOrElse(""); } + return RequestType; } return Record.GetRequestType(); @@ -460,7 +435,7 @@ struct TEvKqp { } ui64 GetQuerySize() const { - return Record.GetRequest().GetQuery().size(); + return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size(); } ui64 GetParametersSize() const { @@ -469,7 +444,7 @@ struct TEvKqp { } ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong(); - for(const auto& [name, param]: Record.GetRequest().GetYdbParameters()) { + for(const auto& [name, param]: GetYdbParameters()) { ParametersSize += name.size(); ParametersSize += param.ByteSizeLong(); } @@ -512,6 +487,9 @@ struct TEvKqp { private: mutable ui64 ParametersSize = 0; mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx; + mutable TString TraceId; + mutable TString RequestType; + TString Database; TString SessionId; TString YqlText; TString QueryId; @@ -522,6 +500,9 @@ struct TEvKqp { const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr; const ::Ydb::Operations::OperationParams* OperationParams = nullptr; + bool KeepSession = false; + TDuration OperationTimeout; + TDuration CancelAfter; }; struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest, diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index f236827552..89b8102373 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -5,14 +5,48 @@ namespace NKikimr::NKqp { +TEvKqp::TEvQueryRequest::TEvQueryRequest( + const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx, + const TString& sessionId, + TActorId actorId, + TString&& yqlText, + TString&& queryId, + NKikimrKqp::EQueryAction queryAction, + NKikimrKqp::EQueryType queryType, + const ::Ydb::Table::TransactionControl* txControl, + const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters, + const ::Ydb::Table::QueryStatsCollection::Mode collectStats, + const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, + const ::Ydb::Operations::OperationParams* operationParams, + bool keepSession) + : RequestCtx(ctx) + , Database(CanonizePath(ctx->GetDatabaseName().GetOrElse(""))) + , SessionId(sessionId) + , YqlText(std::move(yqlText)) + , QueryId(std::move(queryId)) + , QueryAction(queryAction) + , QueryType(queryType) + , TxControl(txControl) + , YdbParameters(ydbParameters) + , CollectStats(collectStats) + , QueryCachePolicy(queryCachePolicy) + , OperationParams(operationParams) + , KeepSession(keepSession) +{ + if (OperationParams) { + OperationTimeout = GetDuration(OperationParams->operation_timeout()); + CancelAfter = GetDuration(OperationParams->cancel_after()); + } + ActorIdToProto(actorId, Record.MutableCancelationActor()); +} + void TEvKqp::TEvQueryRequest::PrepareRemote() const { if (RequestCtx) { if (RequestCtx->GetInternalToken()) { Record.SetUserToken(RequestCtx->GetInternalToken()); } - Record.MutableRequest()->SetDatabase( - CanonizePath(RequestCtx->GetDatabaseName().GetOrElse(""))); + Record.MutableRequest()->SetDatabase(Database); if (auto traceId = RequestCtx->GetTraceId()) { Record.SetTraceId(traceId.GetRef()); @@ -50,11 +84,8 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); if (OperationParams) { - const auto& operationTimeout = GetDuration(OperationParams->operation_timeout()); - const auto& cancelAfter = GetDuration(OperationParams->cancel_after()); - - Record.MutableRequest()->SetCancelAfterMs(cancelAfter.MilliSeconds()); - Record.MutableRequest()->SetTimeoutMs(operationTimeout.MilliSeconds()); + Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds()); + Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds()); } RequestCtx.reset(); diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index e8258c7c5e..cfdb10293e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -746,7 +746,7 @@ private: item.SetFlags(affectedFlags); } - ui64 sizeLimit = RequestControls.PerRequestDataSizeLimit; + ui64 sizeLimit = Request.PerRequestDataSizeLimit; if (Request.TotalReadSizeLimitBytes > 0) { sizeLimit = sizeLimit ? std::min(sizeLimit, Request.TotalReadSizeLimitBytes) @@ -1450,7 +1450,6 @@ private: NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); - RequestControls.Reqister(TlsActivationContext->AsActorContext()); size_t readActors = 0; ReadOnlyTx = !Request.TopicOperations.HasOperations(); @@ -1655,7 +1654,7 @@ private: } ui32 shardsLimit = Request.MaxAffectedShards; - if (i64 msc = (i64) RequestControls.MaxShardCount; msc > 0) { + if (i64 msc = (i64) Request.MaxShardCount; msc > 0) { shardsLimit = std::min(shardsLimit, (ui32) msc); } size_t shards = datashardTasks.size() + remoteComputeTasks.size(); @@ -2298,7 +2297,6 @@ private: bool HasStreamLookup = false; - NTxProxy::TRequestControls RequestControls; ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; THashMap<ui64, TShardState> TopicTabletStates; diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 51eb219757..53c243a84f 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -108,6 +108,8 @@ public: : TxAlloc(txAlloc) {} + NKikimr::TControlWrapper PerRequestDataSizeLimit; + NKikimr::TControlWrapper MaxShardCount; TVector<TPhysicalTxData> Transactions; TMap<ui64, TVector<NKikimrTxDataShard::TLock>> DataShardLocks; NKikimr::NKqp::TTxAllocatorState::TPtr TxAlloc; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 007d00e21f..55d2a26a50 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -482,37 +482,38 @@ public: } void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { - auto& event = ev->Get()->Record; - auto& request = *event.MutableRequest(); - TString traceId = event.GetTraceId(); + const TString& database = ev->Get()->GetDatabase(); + const TString& traceId = ev->Get()->GetTraceId(); + const auto queryType = ev->Get()->GetType(); + const auto queryAction = ev->Get()->GetAction(); TKqpRequestInfo requestInfo(traceId); ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvQueryRequest); - if (request.GetSessionId().empty()) { + if (ev->Get()->GetSessionId().empty()) { TProcessResult<TKqpSessionInfo*> result; if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false, - request.GetDatabase(), false, result)) + database, false, result)) { ReplyProcessError(result.YdbStatus, result.Error, requestId); return; } - request.SetSessionId(result.Value->SessionId); + ev->Get()->SetSessionId(result.Value->SessionId); } - TString sessionId = request.GetSessionId(); + const TString& sessionId = ev->Get()->GetSessionId(); const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; if (!dbCounters) { - dbCounters = Counters->GetDbCounters(request.GetDatabase()); + dbCounters = Counters->GetDbCounters(database); } PendingRequests.SetSessionId(requestId, sessionId, dbCounters); Counters->ReportQueryRequest(dbCounters, ev->Get()->GetRequestSize(), ev->Get()->GetParametersSize(), ev->Get()->GetQuerySize()); - Counters->ReportQueryAction(dbCounters, request.GetAction()); - Counters->ReportQueryType(dbCounters, request.GetType()); + Counters->ReportQueryAction(dbCounters, queryAction); + Counters->ReportQueryType(dbCounters, queryType); auto queryLimitBytes = TableServiceConfig.GetQueryLimitBytes(); - if (queryLimitBytes && IsSqlQuery(request.GetType()) && ev->Get()->GetQuerySize() > queryLimitBytes) { + if (queryLimitBytes && IsSqlQuery(queryType) && ev->Get()->GetQuerySize() > queryLimitBytes) { TString error = TStringBuilder() << "Query text size exceeds limit (" << ev->Get()->GetQuerySize() << "b > " << queryLimitBytes << "b)"; ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); @@ -527,26 +528,12 @@ public: return; } - if (request.HasTxControl() && request.GetTxControl().has_begin_tx()) { - switch (request.GetTxControl().begin_tx().tx_mode_case()) { - case Ydb::Table::TransactionSettings::kSnapshotReadOnly: - if (!AppData()->FeatureFlags.GetEnableMvccSnapshotReads()) { - ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, - "Snapshot reads not supported in current database", requestId); - return; - } - - default: - break; - } - } - TActorId targetId; if (sessionInfo) { targetId = sessionInfo->WorkerId; LocalSessions->StopIdleCheck(sessionInfo); } else { - targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId); + targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); if (!targetId) { return; } @@ -556,10 +543,12 @@ public: // because it is much better to give detailed error message rather than generic timeout. // For example, it helps to avoid race in event order when worker and proxy recieve timeout at the same moment. // If worker located in the different datacenter we should better substract some RTT estimate, but at this point it's not done. - auto timeoutMs = GetQueryTimeout(request.GetType(), request.GetTimeoutMs(), TableServiceConfig) + DEFAULT_EXTRA_TIMEOUT_WAIT; + auto timeout = ev->Get()->GetOperationTimeout(); + auto timeoutMs = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig) + DEFAULT_EXTRA_TIMEOUT_WAIT; StartQueryTimeout(requestId, timeoutMs); Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId); - KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "Sent request to target, requestId: " << requestId << ", targetId: " << targetId); + KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId + << ", targetId: " << targetId << ", sessionId: " << sessionId); } void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 9044d8f85e..033ee620fa 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -103,16 +103,12 @@ struct TKqpQueryState { TDuration CpuTime; std::optional<NCpuTime::TCpuTimer> CurrentTimer; - const NKikimrKqp::TQueryRequest& RequestProto() const { - return RequestEv->Record.GetRequest(); - } - NKikimrKqp::EQueryAction GetAction() const { return RequestEv->GetAction(); } bool GetKeepSession() const { - return RequestProto().GetKeepSession(); + return RequestEv->GetKeepSession(); } const TString& GetQuery() const { @@ -137,11 +133,13 @@ struct TKqpQueryState { void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) { auto now = TAppData::TimeProvider->Now(); - if (RequestProto().GetCancelAfterMs()) { - QueryDeadlines.CancelAt = now + TDuration::MilliSeconds(RequestProto().GetCancelAfterMs()); + auto cancelAfter = RequestEv->GetCancelAfter(); + auto timeout = RequestEv->GetOperationTimeout(); + if (cancelAfter.MilliSeconds() > 0) { + QueryDeadlines.CancelAt = now + cancelAfter; } - auto timeoutMs = GetQueryTimeout(GetType(), RequestProto().GetTimeoutMs(), service); + auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service); QueryDeadlines.TimeoutAt = now + timeoutMs; } @@ -149,12 +147,12 @@ struct TKqpQueryState { return RequestEv->HasTopicOperations(); } - const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const { - return RequestEv->GetQueryCachePolicy(); + bool GetQueryKeepInCache() const { + return RequestEv->GetQueryKeepInCache(); } const TString& GetDatabase() const { - return RequestProto().GetDatabase(); + return RequestEv->GetDatabase(); } TString ExtractQueryText() const { @@ -164,11 +162,11 @@ struct TKqpQueryState { } return {}; } - return RequestProto().GetQuery(); + return RequestEv->GetQuery(); } const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { - return RequestProto().GetTopicOperations(); + return RequestEv->GetTopicOperations(); } bool NeedPersistentSnapshot() const { @@ -196,15 +194,16 @@ struct TKqpQueryState { } Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const { - if (!RequestProto().HasCollectStats()) { + if (!RequestEv->HasCollectStats()) { return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; } - if (RequestProto().GetCollectStats() == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) { + auto cStats = RequestEv->GetCollectStats(); + if (cStats == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) { return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; } - return RequestProto().GetCollectStats(); + return cStats; } bool CollectStatsDefined() const { @@ -212,7 +211,7 @@ struct TKqpQueryState { } bool HasPreparedQuery() const { - return RequestProto().HasPreparedQuery(); + return RequestEv->HasPreparedQuery(); } bool IsStreamResult() const { @@ -302,6 +301,7 @@ public: Config->FeatureFlags = AppData()->FeatureFlags; + RequestControls.Reqister(TlsActivationContext->AsActorContext()); Become(&TKqpSessionActor::ReadyState); } @@ -364,6 +364,8 @@ public: if (!ConvertParameters()) return; + QueryState->RequestEv->PrepareRemote(); + if (!WorkerId) { std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings, ModuleResolverState, Counters)); @@ -461,9 +463,8 @@ public: void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) { ui64 proxyRequestId = ev->Cookie; - auto& event = ev->Get()->Record; - YQL_ENSURE(event.GetRequest().GetSessionId() == SessionId, - "Invalid session, expected: " << SessionId << ", got: " << event.GetRequest().GetSessionId()); + YQL_ENSURE(ev->Get()->GetSessionId() == SessionId, + "Invalid session, expected: " << SessionId << ", got: " << ev->Get()->GetSessionId()); if (ev->Get()->HasYdbStatus() && ev->Get()->GetYdbStatus() != Ydb::StatusIds::SUCCESS) { NYql::TIssues issues; @@ -613,7 +614,7 @@ public: switch (QueryState->GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: query = TKqpQueryId(Settings.Cluster, Settings.Database, QueryState->GetQuery(), QueryState->GetType()); - keepInCache = QueryState->GetQueryCachePolicy().keep_in_cache() && query->IsSql(); + keepInCache = QueryState->GetQueryKeepInCache() && query->IsSql(); break; case NKikimrKqp::QUERY_ACTION_PREPARE: @@ -623,7 +624,7 @@ public: case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: uid = QueryState->GetPreparedQuery(); - keepInCache = QueryState->GetQueryCachePolicy().keep_in_cache(); + keepInCache = QueryState->GetQueryKeepInCache(); break; default: @@ -1282,6 +1283,8 @@ public: if (QueryState) { request.Orbit = std::move(QueryState->Orbit); } + request.PerRequestDataSizeLimit = RequestControls.PerRequestDataSizeLimit; + request.MaxShardCount = RequestControls.MaxShardCount; request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); @@ -1603,7 +1606,7 @@ public: break; case NKikimrKqp::QUERY_ACTION_EXECUTE: - replyQueryParameters = replyQueryId = QueryState->GetQueryCachePolicy().keep_in_cache(); + replyQueryParameters = replyQueryId = QueryState->GetQueryKeepInCache(); break; case NKikimrKqp::QUERY_ACTION_PARSE: @@ -2290,6 +2293,7 @@ private: std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse; std::optional<TSessionShutdownState> ShutdownState; TULIDGenerator UlidGen; + NTxProxy::TRequestControls RequestControls; }; } // namespace |