aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-02-21 14:59:22 +0300
committergvit <gvit@ydb.tech>2023-02-21 14:59:22 +0300
commitedb2549e05cf120e034386d1b1664947195ccf6c (patch)
treee83dde458964144eaf097ba4db1724ec061e0074
parentb2ea1197cdc3ea11737b036349f91986e3a90743 (diff)
downloadydb-edb2549e05cf120e034386d1b1664947195ccf6c.tar.gz
avoid copies into proto
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp5
-rw-r--r--ydb/core/kqp/common/kqp.h143
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp45
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp6
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp45
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp50
7 files changed, 150 insertions, 146 deletions
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index 170dd68c95..a3dbf019bb 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -138,9 +138,8 @@ public:
&req->tx_control(),
&req->parameters(),
req->collect_stats(),
- &req->query_cache_policy(),
- &req->operation_params());
- ev->PrepareRemote();
+ req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
+ req->has_operation_params() ? &req->operation_params() : nullptr);
ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 09d7ae7220..cdd7119ae5 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -256,7 +256,7 @@ struct TEvKqp {
struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
public:
TEvQueryRequest(
- std::shared_ptr<NGRpcService::IRequestCtxMtSafe> ctx,
+ const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx,
const TString& sessionId,
TActorId actorId,
TString&& yqlText,
@@ -267,22 +267,8 @@ struct TEvKqp {
const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters,
const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
- const ::Ydb::Operations::OperationParams* operationParams)
- : RequestCtx(ctx)
- , SessionId(sessionId)
- , YqlText(std::move(yqlText))
- , QueryId(std::move(queryId))
- , QueryAction(queryAction)
- , QueryType(queryType)
- , TxControl(txControl)
- , YdbParameters(ydbParameters)
- , CollectStats(collectStats)
- , QueryCachePolicy(queryCachePolicy)
- , OperationParams(operationParams)
- {
- ActorIdToProto(actorId, Record.MutableCancelationActor());
- }
-
+ const ::Ydb::Operations::OperationParams* operationParams,
+ bool keepSession = false);
TEvQueryRequest() = default;
@@ -290,77 +276,70 @@ struct TEvKqp {
return true;
}
- // Same as TEventPBBase but without Rope (but can contain Payload and will lose some data after all)
TEventSerializationInfo CreateSerializationInfo() const override { return {}; }
+ const TString& GetDatabase() const {
+ return RequestCtx ? Database : Record.GetRequest().GetDatabase();
+ }
+
bool HasYdbStatus() const {
- if (RequestCtx) {
- return false;
- }
+ return RequestCtx ? false : Record.HasYdbStatus();
+ }
- return Record.HasYdbStatus();
+ const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
+ return Record.GetRequest().GetTopicOperations();
}
bool HasTopicOperations() const {
return Record.GetRequest().HasTopicOperations();
}
+ bool GetKeepSession() const {
+ return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession();
+ }
+
+ TDuration GetCancelAfter() const {
+ return RequestCtx ? CancelAfter : TDuration::MilliSeconds(Record.GetRequest().GetCancelAfterMs());
+ }
+
+ TDuration GetOperationTimeout() const {
+ return RequestCtx ? OperationTimeout : TDuration::MilliSeconds(Record.GetRequest().GetTimeoutMs());
+ }
+
bool HasAction() const {
+ return RequestCtx ? true : Record.GetRequest().HasAction();
+ }
+
+ void SetSessionId(const TString& sessionId) {
if (RequestCtx) {
- // passed directly to constructor.
- return true;
+ SessionId = sessionId;
} else {
- return Record.GetRequest().HasAction();
+ Record.MutableRequest()->SetSessionId(sessionId);
}
}
const TString& GetSessionId() const {
- if (RequestCtx) {
- return SessionId;
- }
-
- return Record.GetRequest().GetSessionId();
+ return RequestCtx ? SessionId : Record.GetRequest().GetSessionId();
}
NKikimrKqp::EQueryAction GetAction() const {
- if (RequestCtx) {
- return QueryAction;
- }
-
- return Record.GetRequest().GetAction();
-
+ return RequestCtx ? QueryAction : Record.GetRequest().GetAction();
}
NKikimrKqp::EQueryType GetType() const {
- if (RequestCtx) {
- return QueryType;
- }
-
- return Record.GetRequest().GetType();
+ return RequestCtx ? QueryType : Record.GetRequest().GetType();
}
bool HasPreparedQuery() const {
- if (!QueryId.empty()) {
- return true;
- }
-
- return Record.GetRequest().HasPreparedQuery();
+ return RequestCtx ? QueryId.size() > 0 : Record.GetRequest().HasPreparedQuery();
}
const TString& GetPreparedQuery() const {
- if (!QueryId.empty()) {
- return QueryId;
- }
-
- return Record.GetRequest().GetPreparedQuery();
+ return RequestCtx ? QueryId : Record.GetRequest().GetPreparedQuery();
}
const TString& GetQuery() const {
- if (!YqlText.empty()) {
- return YqlText;
- }
-
- return Record.GetRequest().GetQuery();
+ return RequestCtx ? YqlText : Record.GetRequest().GetQuery();
}
const ::NKikimrMiniKQL::TParams& GetParameters() const {
@@ -368,35 +347,29 @@ struct TEvKqp {
}
const ::Ydb::Table::TransactionControl& GetTxControl() const {
- if (TxControl) {
- return *TxControl;
- }
-
- return Record.GetRequest().GetTxControl();
+ return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl();
}
bool GetUsePublicResponseDataFormat() const {
- if (RequestCtx) {
- return true;
- }
-
- return Record.GetRequest().GetUsePublicResponseDataFormat();
+ return RequestCtx ? true : Record.GetRequest().GetUsePublicResponseDataFormat();
}
- const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const {
- if (QueryCachePolicy) {
- return *QueryCachePolicy;
+ bool GetQueryKeepInCache() const {
+ if (RequestCtx) {
+ if (QueryCachePolicy != nullptr) {
+ return QueryCachePolicy->keep_in_cache();
+ }
+ return false;
}
-
- return Record.GetRequest().GetQueryCachePolicy();
+ return Record.GetRequest().GetQueryCachePolicy().keep_in_cache();
}
bool HasTxControl() const {
- if (TxControl) {
- return true;
- }
+ return RequestCtx ? TxControl != nullptr : Record.GetRequest().HasTxControl();
+ }
- return Record.GetRequest().HasTxControl();
+ bool HasCollectStats() const {
+ return RequestCtx ? true : Record.GetRequest().HasCollectStats();
}
TActorId GetRequestActorId() const {
@@ -405,9 +378,10 @@ struct TEvKqp {
const TString& GetTraceId() const {
if (RequestCtx) {
- if (auto traceId = RequestCtx->GetTraceId()) {
- return traceId.GetRef();
+ if (!TraceId) {
+ TraceId = RequestCtx->GetTraceId().GetOrElse("");
}
+ return TraceId;
}
return Record.GetTraceId();
@@ -415,9 +389,10 @@ struct TEvKqp {
const TString& GetRequestType() const {
if (RequestCtx) {
- if (auto requestType = RequestCtx->GetRequestType()) {
- return requestType.GetRef();
+ if (!RequestType) {
+ RequestType = RequestCtx->GetRequestType().GetOrElse("");
}
+ return RequestType;
}
return Record.GetRequestType();
@@ -460,7 +435,7 @@ struct TEvKqp {
}
ui64 GetQuerySize() const {
- return Record.GetRequest().GetQuery().size();
+ return RequestCtx ? YqlText.size() : Record.GetRequest().GetQuery().size();
}
ui64 GetParametersSize() const {
@@ -469,7 +444,7 @@ struct TEvKqp {
}
ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong();
- for(const auto& [name, param]: Record.GetRequest().GetYdbParameters()) {
+ for(const auto& [name, param]: GetYdbParameters()) {
ParametersSize += name.size();
ParametersSize += param.ByteSizeLong();
}
@@ -512,6 +487,9 @@ struct TEvKqp {
private:
mutable ui64 ParametersSize = 0;
mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx;
+ mutable TString TraceId;
+ mutable TString RequestType;
+ TString Database;
TString SessionId;
TString YqlText;
TString QueryId;
@@ -522,6 +500,9 @@ struct TEvKqp {
const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr;
const ::Ydb::Operations::OperationParams* OperationParams = nullptr;
+ bool KeepSession = false;
+ TDuration OperationTimeout;
+ TDuration CancelAfter;
};
struct TEvCloseSessionRequest : public TEventPB<TEvCloseSessionRequest,
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
index f236827552..89b8102373 100644
--- a/ydb/core/kqp/common/kqp_event_impl.cpp
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -5,14 +5,48 @@
namespace NKikimr::NKqp {
+TEvKqp::TEvQueryRequest::TEvQueryRequest(
+ const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx,
+ const TString& sessionId,
+ TActorId actorId,
+ TString&& yqlText,
+ TString&& queryId,
+ NKikimrKqp::EQueryAction queryAction,
+ NKikimrKqp::EQueryType queryType,
+ const ::Ydb::Table::TransactionControl* txControl,
+ const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters,
+ const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
+ const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
+ const ::Ydb::Operations::OperationParams* operationParams,
+ bool keepSession)
+ : RequestCtx(ctx)
+ , Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
+ , SessionId(sessionId)
+ , YqlText(std::move(yqlText))
+ , QueryId(std::move(queryId))
+ , QueryAction(queryAction)
+ , QueryType(queryType)
+ , TxControl(txControl)
+ , YdbParameters(ydbParameters)
+ , CollectStats(collectStats)
+ , QueryCachePolicy(queryCachePolicy)
+ , OperationParams(operationParams)
+ , KeepSession(keepSession)
+{
+ if (OperationParams) {
+ OperationTimeout = GetDuration(OperationParams->operation_timeout());
+ CancelAfter = GetDuration(OperationParams->cancel_after());
+ }
+ ActorIdToProto(actorId, Record.MutableCancelationActor());
+}
+
void TEvKqp::TEvQueryRequest::PrepareRemote() const {
if (RequestCtx) {
if (RequestCtx->GetInternalToken()) {
Record.SetUserToken(RequestCtx->GetInternalToken());
}
- Record.MutableRequest()->SetDatabase(
- CanonizePath(RequestCtx->GetDatabaseName().GetOrElse("")));
+ Record.MutableRequest()->SetDatabase(Database);
if (auto traceId = RequestCtx->GetTraceId()) {
Record.SetTraceId(traceId.GetRef());
@@ -50,11 +84,8 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
if (OperationParams) {
- const auto& operationTimeout = GetDuration(OperationParams->operation_timeout());
- const auto& cancelAfter = GetDuration(OperationParams->cancel_after());
-
- Record.MutableRequest()->SetCancelAfterMs(cancelAfter.MilliSeconds());
- Record.MutableRequest()->SetTimeoutMs(operationTimeout.MilliSeconds());
+ Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds());
+ Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds());
}
RequestCtx.reset();
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index e8258c7c5e..cfdb10293e 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -746,7 +746,7 @@ private:
item.SetFlags(affectedFlags);
}
- ui64 sizeLimit = RequestControls.PerRequestDataSizeLimit;
+ ui64 sizeLimit = Request.PerRequestDataSizeLimit;
if (Request.TotalReadSizeLimitBytes > 0) {
sizeLimit = sizeLimit
? std::min(sizeLimit, Request.TotalReadSizeLimitBytes)
@@ -1450,7 +1450,6 @@ private:
NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
- RequestControls.Reqister(TlsActivationContext->AsActorContext());
size_t readActors = 0;
ReadOnlyTx = !Request.TopicOperations.HasOperations();
@@ -1655,7 +1654,7 @@ private:
}
ui32 shardsLimit = Request.MaxAffectedShards;
- if (i64 msc = (i64) RequestControls.MaxShardCount; msc > 0) {
+ if (i64 msc = (i64) Request.MaxShardCount; msc > 0) {
shardsLimit = std::min(shardsLimit, (ui32) msc);
}
size_t shards = datashardTasks.size() + remoteComputeTasks.size();
@@ -2298,7 +2297,6 @@ private:
bool HasStreamLookup = false;
- NTxProxy::TRequestControls RequestControls;
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;
THashMap<ui64, TShardState> TopicTabletStates;
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 51eb219757..53c243a84f 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -108,6 +108,8 @@ public:
: TxAlloc(txAlloc)
{}
+ NKikimr::TControlWrapper PerRequestDataSizeLimit;
+ NKikimr::TControlWrapper MaxShardCount;
TVector<TPhysicalTxData> Transactions;
TMap<ui64, TVector<NKikimrTxDataShard::TLock>> DataShardLocks;
NKikimr::NKqp::TTxAllocatorState::TPtr TxAlloc;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 007d00e21f..55d2a26a50 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -482,37 +482,38 @@ public:
}
void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) {
- auto& event = ev->Get()->Record;
- auto& request = *event.MutableRequest();
- TString traceId = event.GetTraceId();
+ const TString& database = ev->Get()->GetDatabase();
+ const TString& traceId = ev->Get()->GetTraceId();
+ const auto queryType = ev->Get()->GetType();
+ const auto queryAction = ev->Get()->GetAction();
TKqpRequestInfo requestInfo(traceId);
ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvQueryRequest);
- if (request.GetSessionId().empty()) {
+ if (ev->Get()->GetSessionId().empty()) {
TProcessResult<TKqpSessionInfo*> result;
if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false,
- request.GetDatabase(), false, result))
+ database, false, result))
{
ReplyProcessError(result.YdbStatus, result.Error, requestId);
return;
}
- request.SetSessionId(result.Value->SessionId);
+ ev->Get()->SetSessionId(result.Value->SessionId);
}
- TString sessionId = request.GetSessionId();
+ const TString& sessionId = ev->Get()->GetSessionId();
const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
if (!dbCounters) {
- dbCounters = Counters->GetDbCounters(request.GetDatabase());
+ dbCounters = Counters->GetDbCounters(database);
}
PendingRequests.SetSessionId(requestId, sessionId, dbCounters);
Counters->ReportQueryRequest(dbCounters, ev->Get()->GetRequestSize(), ev->Get()->GetParametersSize(), ev->Get()->GetQuerySize());
- Counters->ReportQueryAction(dbCounters, request.GetAction());
- Counters->ReportQueryType(dbCounters, request.GetType());
+ Counters->ReportQueryAction(dbCounters, queryAction);
+ Counters->ReportQueryType(dbCounters, queryType);
auto queryLimitBytes = TableServiceConfig.GetQueryLimitBytes();
- if (queryLimitBytes && IsSqlQuery(request.GetType()) && ev->Get()->GetQuerySize() > queryLimitBytes) {
+ if (queryLimitBytes && IsSqlQuery(queryType) && ev->Get()->GetQuerySize() > queryLimitBytes) {
TString error = TStringBuilder() << "Query text size exceeds limit ("
<< ev->Get()->GetQuerySize() << "b > " << queryLimitBytes << "b)";
ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
@@ -527,26 +528,12 @@ public:
return;
}
- if (request.HasTxControl() && request.GetTxControl().has_begin_tx()) {
- switch (request.GetTxControl().begin_tx().tx_mode_case()) {
- case Ydb::Table::TransactionSettings::kSnapshotReadOnly:
- if (!AppData()->FeatureFlags.GetEnableMvccSnapshotReads()) {
- ReplyProcessError(Ydb::StatusIds::BAD_REQUEST,
- "Snapshot reads not supported in current database", requestId);
- return;
- }
-
- default:
- break;
- }
- }
-
TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
LocalSessions->StopIdleCheck(sessionInfo);
} else {
- targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId);
+ targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
if (!targetId) {
return;
}
@@ -556,10 +543,12 @@ public:
// because it is much better to give detailed error message rather than generic timeout.
// For example, it helps to avoid race in event order when worker and proxy recieve timeout at the same moment.
// If worker located in the different datacenter we should better substract some RTT estimate, but at this point it's not done.
- auto timeoutMs = GetQueryTimeout(request.GetType(), request.GetTimeoutMs(), TableServiceConfig) + DEFAULT_EXTRA_TIMEOUT_WAIT;
+ auto timeout = ev->Get()->GetOperationTimeout();
+ auto timeoutMs = GetQueryTimeout(queryType, timeout.MilliSeconds(), TableServiceConfig) + DEFAULT_EXTRA_TIMEOUT_WAIT;
StartQueryTimeout(requestId, timeoutMs);
Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
- KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "Sent request to target, requestId: " << requestId << ", targetId: " << targetId);
+ KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId
+ << ", targetId: " << targetId << ", sessionId: " << sessionId);
}
void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 9044d8f85e..033ee620fa 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -103,16 +103,12 @@ struct TKqpQueryState {
TDuration CpuTime;
std::optional<NCpuTime::TCpuTimer> CurrentTimer;
- const NKikimrKqp::TQueryRequest& RequestProto() const {
- return RequestEv->Record.GetRequest();
- }
-
NKikimrKqp::EQueryAction GetAction() const {
return RequestEv->GetAction();
}
bool GetKeepSession() const {
- return RequestProto().GetKeepSession();
+ return RequestEv->GetKeepSession();
}
const TString& GetQuery() const {
@@ -137,11 +133,13 @@ struct TKqpQueryState {
void SetQueryDeadlines(const NKikimrConfig::TTableServiceConfig& service) {
auto now = TAppData::TimeProvider->Now();
- if (RequestProto().GetCancelAfterMs()) {
- QueryDeadlines.CancelAt = now + TDuration::MilliSeconds(RequestProto().GetCancelAfterMs());
+ auto cancelAfter = RequestEv->GetCancelAfter();
+ auto timeout = RequestEv->GetOperationTimeout();
+ if (cancelAfter.MilliSeconds() > 0) {
+ QueryDeadlines.CancelAt = now + cancelAfter;
}
- auto timeoutMs = GetQueryTimeout(GetType(), RequestProto().GetTimeoutMs(), service);
+ auto timeoutMs = GetQueryTimeout(GetType(), timeout.MilliSeconds(), service);
QueryDeadlines.TimeoutAt = now + timeoutMs;
}
@@ -149,12 +147,12 @@ struct TKqpQueryState {
return RequestEv->HasTopicOperations();
}
- const ::Ydb::Table::QueryCachePolicy& GetQueryCachePolicy() const {
- return RequestEv->GetQueryCachePolicy();
+ bool GetQueryKeepInCache() const {
+ return RequestEv->GetQueryKeepInCache();
}
const TString& GetDatabase() const {
- return RequestProto().GetDatabase();
+ return RequestEv->GetDatabase();
}
TString ExtractQueryText() const {
@@ -164,11 +162,11 @@ struct TKqpQueryState {
}
return {};
}
- return RequestProto().GetQuery();
+ return RequestEv->GetQuery();
}
const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
- return RequestProto().GetTopicOperations();
+ return RequestEv->GetTopicOperations();
}
bool NeedPersistentSnapshot() const {
@@ -196,15 +194,16 @@ struct TKqpQueryState {
}
Ydb::Table::QueryStatsCollection::Mode GetStatsMode() const {
- if (!RequestProto().HasCollectStats()) {
+ if (!RequestEv->HasCollectStats()) {
return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
}
- if (RequestProto().GetCollectStats() == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) {
+ auto cStats = RequestEv->GetCollectStats();
+ if (cStats == Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED) {
return Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
}
- return RequestProto().GetCollectStats();
+ return cStats;
}
bool CollectStatsDefined() const {
@@ -212,7 +211,7 @@ struct TKqpQueryState {
}
bool HasPreparedQuery() const {
- return RequestProto().HasPreparedQuery();
+ return RequestEv->HasPreparedQuery();
}
bool IsStreamResult() const {
@@ -302,6 +301,7 @@ public:
Config->FeatureFlags = AppData()->FeatureFlags;
+ RequestControls.Reqister(TlsActivationContext->AsActorContext());
Become(&TKqpSessionActor::ReadyState);
}
@@ -364,6 +364,8 @@ public:
if (!ConvertParameters())
return;
+ QueryState->RequestEv->PrepareRemote();
+
if (!WorkerId) {
std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings,
ModuleResolverState, Counters));
@@ -461,9 +463,8 @@ public:
void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) {
ui64 proxyRequestId = ev->Cookie;
- auto& event = ev->Get()->Record;
- YQL_ENSURE(event.GetRequest().GetSessionId() == SessionId,
- "Invalid session, expected: " << SessionId << ", got: " << event.GetRequest().GetSessionId());
+ YQL_ENSURE(ev->Get()->GetSessionId() == SessionId,
+ "Invalid session, expected: " << SessionId << ", got: " << ev->Get()->GetSessionId());
if (ev->Get()->HasYdbStatus() && ev->Get()->GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
NYql::TIssues issues;
@@ -613,7 +614,7 @@ public:
switch (QueryState->GetAction()) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
query = TKqpQueryId(Settings.Cluster, Settings.Database, QueryState->GetQuery(), QueryState->GetType());
- keepInCache = QueryState->GetQueryCachePolicy().keep_in_cache() && query->IsSql();
+ keepInCache = QueryState->GetQueryKeepInCache() && query->IsSql();
break;
case NKikimrKqp::QUERY_ACTION_PREPARE:
@@ -623,7 +624,7 @@ public:
case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
uid = QueryState->GetPreparedQuery();
- keepInCache = QueryState->GetQueryCachePolicy().keep_in_cache();
+ keepInCache = QueryState->GetQueryKeepInCache();
break;
default:
@@ -1282,6 +1283,8 @@ public:
if (QueryState) {
request.Orbit = std::move(QueryState->Orbit);
}
+ request.PerRequestDataSizeLimit = RequestControls.PerRequestDataSizeLimit;
+ request.MaxShardCount = RequestControls.MaxShardCount;
request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
@@ -1603,7 +1606,7 @@ public:
break;
case NKikimrKqp::QUERY_ACTION_EXECUTE:
- replyQueryParameters = replyQueryId = QueryState->GetQueryCachePolicy().keep_in_cache();
+ replyQueryParameters = replyQueryId = QueryState->GetQueryKeepInCache();
break;
case NKikimrKqp::QUERY_ACTION_PARSE:
@@ -2290,6 +2293,7 @@ private:
std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse;
std::optional<TSessionShutdownState> ShutdownState;
TULIDGenerator UlidGen;
+ NTxProxy::TRequestControls RequestControls;
};
} // namespace