summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <[email protected]>2023-01-18 18:50:57 +0300
committergvit <[email protected]>2023-01-18 18:50:57 +0300
commitb8b4cd20df9ce74945f47b018e17cbbedf1f0580 (patch)
tree52760d8d257c459ec161377870dea02a751aec58
parent6de28661c7d56de0dc4169c2a2dc149bd3c66595 (diff)
fix query parameters size calculation & fix test
-rw-r--r--ydb/core/kqp/common/kqp.h23
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp30
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h6
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp70
4 files changed, 64 insertions, 65 deletions
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index a3bb419c4ef..009cac21c97 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -273,6 +273,28 @@ struct TEvKqp {
return false;
}
+ ui64 GetRequestSize() const {
+ return Record.GetRequest().ByteSizeLong();
+ }
+
+ ui64 GetQuerySize() const {
+ return Record.GetRequest().GetQuery().size();
+ }
+
+ ui64 GetParametersSize() const {
+ if (ParametersSize > 0) {
+ return ParametersSize;
+ }
+
+ ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong();
+ for(const auto& [name, param]: Record.GetRequest().GetYdbParameters()) {
+ ParametersSize += name.size();
+ ParametersSize += param.ByteSizeLong();
+ }
+
+ return ParametersSize;
+ }
+
ui32 CalculateSerializedSize() const override {
PrepareRemote();
return Record.ByteSize();
@@ -308,6 +330,7 @@ struct TEvKqp {
RequestCtx.reset();
}
}
+ mutable ui64 ParametersSize = 0;
mutable NKikimrKqp::TEvQueryRequest Record;
private:
mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx;
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 034804a1d91..d55e3d64b1b 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -294,17 +294,11 @@ void TKqpCountersBase::ReportCloseSession(ui64 requestSize) {
*YdbRequestBytes += requestSize;
}
-void TKqpCountersBase::ReportQueryRequest(const NKikimrKqp::TQueryRequest& request) {
- ReportQueryAction(request.GetAction());
- ReportQueryType(request.GetType());
-
- auto requestBytes = request.ByteSize();
- auto parametersBytes = request.GetParameters().ByteSize();
-
+void TKqpCountersBase::ReportQueryRequest(ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes) {
*RequestBytes += requestBytes;
*YdbRequestBytes += requestBytes;
- *QueryBytes += request.GetQuery().size();
+ *QueryBytes += queryBytes;
*ParametersBytes += parametersBytes;
*YdbParametersBytes += parametersBytes;
@@ -830,10 +824,24 @@ void TKqpCounters::ReportCloseSession(TKqpDbCountersPtr dbCounters, ui64 request
}
}
-void TKqpCounters::ReportQueryRequest(TKqpDbCountersPtr dbCounters, const NKikimrKqp::TQueryRequest& request) {
- TKqpCountersBase::ReportQueryRequest(request);
+void TKqpCounters::ReportQueryAction(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryAction action) {
+ TKqpCountersBase::ReportQueryAction(action);
+ if (dbCounters) {
+ dbCounters->ReportQueryAction(action);
+ }
+}
+
+void TKqpCounters::ReportQueryType(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryType type) {
+ TKqpCountersBase::ReportQueryType(type);
+ if (dbCounters) {
+ dbCounters->ReportQueryType(type);
+ }
+}
+
+void TKqpCounters::ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes) {
+ TKqpCountersBase::ReportQueryRequest(requestBytes, parametersBytes, queryBytes);
if (dbCounters) {
- dbCounters->ReportQueryRequest(request);
+ dbCounters->ReportQueryRequest(requestBytes, parametersBytes, queryBytes);
}
}
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index b42b9d81815..cdac762964b 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -41,7 +41,7 @@ protected:
void ReportCreateSession(ui64 requestSize);
void ReportPingSession(ui64 requestSize);
void ReportCloseSession(ui64 requestSize);
- void ReportQueryRequest(const NKikimrKqp::TQueryRequest& request);
+ void ReportQueryRequest(ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes);
void ReportQueryWithRangeScan();
void ReportQueryWithFullScan();
@@ -269,7 +269,9 @@ public:
void ReportCreateSession(TKqpDbCountersPtr dbCounters, ui64 requestSize);
void ReportPingSession(TKqpDbCountersPtr dbCounters, ui64 requestSize);
void ReportCloseSession(TKqpDbCountersPtr dbCounters, ui64 requestSize);
- void ReportQueryRequest(TKqpDbCountersPtr dbCounters, const NKikimrKqp::TQueryRequest& request);
+ void ReportQueryAction(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryAction action);
+ void ReportQueryType(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryType type);
+ void ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes);
void ReportResponseStatus(TKqpDbCountersPtr dbCounters, ui64 responseSize, Ydb::StatusIds::StatusCode ydbStatus);
void ReportResultsBytes(TKqpDbCountersPtr dbCounters, ui64 resultsSize);
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 24984a56f83..fa7d7fd7737 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -468,7 +468,8 @@ public:
dbCounters = Counters->GetDbCounters(request.GetDatabase());
}
- LogRequest(request, requestInfo, ev->Sender, dbCounters);
+ Counters->ReportCreateSession(dbCounters, request.ByteSize());
+ KQP_PROXY_LOG_D("Received create session request, trace_id: " << event.GetTraceId());
responseEv->Record.SetResourceExhausted(result.ResourceExhausted);
responseEv->Record.SetYdbStatus(result.YdbStatus);
@@ -490,7 +491,6 @@ public:
if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false,
request.GetDatabase(), false, result))
{
- LogRequest(request, requestInfo, ev->Sender, requestId, Counters->GetDbCounters(request.GetDatabase()));
ReplyProcessError(result.YdbStatus, result.Error, requestId);
return;
}
@@ -506,26 +506,24 @@ public:
}
PendingRequests.SetSessionId(requestId, sessionId, dbCounters);
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
+ Counters->ReportQueryRequest(dbCounters, ev->Get()->GetRequestSize(), ev->Get()->GetParametersSize(), ev->Get()->GetQuerySize());
+ Counters->ReportQueryAction(dbCounters, request.GetAction());
+ Counters->ReportQueryType(dbCounters, request.GetType());
auto queryLimitBytes = TableServiceConfig.GetQueryLimitBytes();
- if (queryLimitBytes && IsSqlQuery(request.GetType())) {
- auto querySizeBytes = request.GetQuery().size();
- if (querySizeBytes > queryLimitBytes) {
- TString error = TStringBuilder() << "Query text size exceeds limit (" << querySizeBytes << "b > " << queryLimitBytes << "b)";
- ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
- return;
- }
+ if (queryLimitBytes && IsSqlQuery(request.GetType()) && ev->Get()->GetQuerySize() > queryLimitBytes) {
+ TString error = TStringBuilder() << "Query text size exceeds limit ("
+ << ev->Get()->GetQuerySize() << "b > " << queryLimitBytes << "b)";
+ ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
+ return;
}
auto paramsLimitBytes = TableServiceConfig.GetParametersLimitBytes();
- if (paramsLimitBytes) {
- auto paramsBytes = request.GetParameters().ByteSizeLong();
- if (paramsBytes > paramsLimitBytes) {
- TString error = TStringBuilder() << "Parameters size exceeds limit (" << paramsBytes << "b > " << paramsLimitBytes << "b)";
- ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
- return;
- }
+ if (paramsLimitBytes && ev->Get()->GetParametersSize() > paramsLimitBytes) {
+ TString error = TStringBuilder() << "Parameters size exceeds limit ("
+ << ev->Get()->GetParametersSize() << "b > " << paramsLimitBytes << "b)";
+ ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
+ return;
}
if (request.HasTxControl() && request.GetTxControl().has_begin_tx()) {
@@ -573,7 +571,7 @@ public:
const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
- LogRequest(request, requestInfo, ev->Sender, dbCounters);
+ Counters->ReportCloseSession(dbCounters, request.ByteSize());
if (LocalSessions->IsPendingShutdown(sessionId) && dbCounters) {
Counters->ReportSessionGracefulShutdownHit(dbCounters);
@@ -601,7 +599,8 @@ public:
ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest);
const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
+ KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId);
+ Counters->ReportPingSession(dbCounters, request.ByteSize());
TActorId targetId;
if (sessionInfo) {
@@ -1058,39 +1057,6 @@ private:
Counters->ReportResponseStatus(dbCounters, event.ByteSize(), event.GetStatus());
}
- void LogRequest(const NKikimrKqp::TCloseSessionRequest& request,
- const TKqpRequestInfo& requestInfo, const TActorId& sender,
- TKqpDbCountersPtr dbCounters)
- {
- KQP_PROXY_LOG_D(requestInfo << "Received close session request, sender: " << sender << ", SessionId: " << request.GetSessionId());
- Counters->ReportCloseSession(dbCounters, request.ByteSize());
- }
-
- void LogRequest(const NKikimrKqp::TQueryRequest& request,
- const TKqpRequestInfo& requestInfo, const TActorId& sender, ui64 requestId,
- TKqpDbCountersPtr dbCounters)
- {
- KQP_PROXY_LOG_D(requestInfo << "Received new query request, sender: " << sender << ", RequestId: " << requestId
- << ", Query: \"" << request.GetQuery().substr(0, 10000) << "\"");
- Counters->ReportQueryRequest(dbCounters, request);
- }
-
- void LogRequest(const NKikimrKqp::TCreateSessionRequest& request,
- const TKqpRequestInfo& requestInfo, const TActorId& sender,
- TKqpDbCountersPtr dbCounters)
- {
- KQP_PROXY_LOG_D(requestInfo << "Received create session request, sender: " << sender);
- Counters->ReportCreateSession(dbCounters, request.ByteSize());
- }
-
- void LogRequest(const NKikimrKqp::TPingSessionRequest& request,
- const TKqpRequestInfo& requestInfo, const TActorId& sender, ui64 requestId,
- TKqpDbCountersPtr dbCounters)
- {
- KQP_PROXY_LOG_D(requestInfo << "Received ping session request, sender: " << sender << " selfID: " << SelfId() << ", RequestId: " << requestId);
- Counters->ReportPingSession(dbCounters, request.ByteSize());
- }
-
bool ReplyProcessError(Ydb::StatusIds::StatusCode ydbStatus, const TString& message, ui64 requestId)
{
auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message);