diff options
author | gvit <[email protected]> | 2023-01-18 18:50:57 +0300 |
---|---|---|
committer | gvit <[email protected]> | 2023-01-18 18:50:57 +0300 |
commit | b8b4cd20df9ce74945f47b018e17cbbedf1f0580 (patch) | |
tree | 52760d8d257c459ec161377870dea02a751aec58 | |
parent | 6de28661c7d56de0dc4169c2a2dc149bd3c66595 (diff) |
fix query parameters size calculation & fix test
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 23 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 30 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 70 |
4 files changed, 64 insertions, 65 deletions
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index a3bb419c4ef..009cac21c97 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -273,6 +273,28 @@ struct TEvKqp { return false; } + ui64 GetRequestSize() const { + return Record.GetRequest().ByteSizeLong(); + } + + ui64 GetQuerySize() const { + return Record.GetRequest().GetQuery().size(); + } + + ui64 GetParametersSize() const { + if (ParametersSize > 0) { + return ParametersSize; + } + + ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong(); + for(const auto& [name, param]: Record.GetRequest().GetYdbParameters()) { + ParametersSize += name.size(); + ParametersSize += param.ByteSizeLong(); + } + + return ParametersSize; + } + ui32 CalculateSerializedSize() const override { PrepareRemote(); return Record.ByteSize(); @@ -308,6 +330,7 @@ struct TEvKqp { RequestCtx.reset(); } } + mutable ui64 ParametersSize = 0; mutable NKikimrKqp::TEvQueryRequest Record; private: mutable std::shared_ptr<NGRpcService::IRequestCtxMtSafe> RequestCtx; diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 034804a1d91..d55e3d64b1b 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -294,17 +294,11 @@ void TKqpCountersBase::ReportCloseSession(ui64 requestSize) { *YdbRequestBytes += requestSize; } -void TKqpCountersBase::ReportQueryRequest(const NKikimrKqp::TQueryRequest& request) { - ReportQueryAction(request.GetAction()); - ReportQueryType(request.GetType()); - - auto requestBytes = request.ByteSize(); - auto parametersBytes = request.GetParameters().ByteSize(); - +void TKqpCountersBase::ReportQueryRequest(ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes) { *RequestBytes += requestBytes; *YdbRequestBytes += requestBytes; - *QueryBytes += request.GetQuery().size(); + *QueryBytes += queryBytes; *ParametersBytes += parametersBytes; *YdbParametersBytes += parametersBytes; @@ -830,10 +824,24 @@ void TKqpCounters::ReportCloseSession(TKqpDbCountersPtr dbCounters, ui64 request } } -void TKqpCounters::ReportQueryRequest(TKqpDbCountersPtr dbCounters, const NKikimrKqp::TQueryRequest& request) { - TKqpCountersBase::ReportQueryRequest(request); +void TKqpCounters::ReportQueryAction(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryAction action) { + TKqpCountersBase::ReportQueryAction(action); + if (dbCounters) { + dbCounters->ReportQueryAction(action); + } +} + +void TKqpCounters::ReportQueryType(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryType type) { + TKqpCountersBase::ReportQueryType(type); + if (dbCounters) { + dbCounters->ReportQueryType(type); + } +} + +void TKqpCounters::ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes) { + TKqpCountersBase::ReportQueryRequest(requestBytes, parametersBytes, queryBytes); if (dbCounters) { - dbCounters->ReportQueryRequest(request); + dbCounters->ReportQueryRequest(requestBytes, parametersBytes, queryBytes); } } diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index b42b9d81815..cdac762964b 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -41,7 +41,7 @@ protected: void ReportCreateSession(ui64 requestSize); void ReportPingSession(ui64 requestSize); void ReportCloseSession(ui64 requestSize); - void ReportQueryRequest(const NKikimrKqp::TQueryRequest& request); + void ReportQueryRequest(ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes); void ReportQueryWithRangeScan(); void ReportQueryWithFullScan(); @@ -269,7 +269,9 @@ public: void ReportCreateSession(TKqpDbCountersPtr dbCounters, ui64 requestSize); void ReportPingSession(TKqpDbCountersPtr dbCounters, ui64 requestSize); void ReportCloseSession(TKqpDbCountersPtr dbCounters, ui64 requestSize); - void ReportQueryRequest(TKqpDbCountersPtr dbCounters, const NKikimrKqp::TQueryRequest& request); + void ReportQueryAction(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryAction action); + void ReportQueryType(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryType type); + void ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes); void ReportResponseStatus(TKqpDbCountersPtr dbCounters, ui64 responseSize, Ydb::StatusIds::StatusCode ydbStatus); void ReportResultsBytes(TKqpDbCountersPtr dbCounters, ui64 resultsSize); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 24984a56f83..fa7d7fd7737 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -468,7 +468,8 @@ public: dbCounters = Counters->GetDbCounters(request.GetDatabase()); } - LogRequest(request, requestInfo, ev->Sender, dbCounters); + Counters->ReportCreateSession(dbCounters, request.ByteSize()); + KQP_PROXY_LOG_D("Received create session request, trace_id: " << event.GetTraceId()); responseEv->Record.SetResourceExhausted(result.ResourceExhausted); responseEv->Record.SetYdbStatus(result.YdbStatus); @@ -490,7 +491,6 @@ public: if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false, request.GetDatabase(), false, result)) { - LogRequest(request, requestInfo, ev->Sender, requestId, Counters->GetDbCounters(request.GetDatabase())); ReplyProcessError(result.YdbStatus, result.Error, requestId); return; } @@ -506,26 +506,24 @@ public: } PendingRequests.SetSessionId(requestId, sessionId, dbCounters); - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); + Counters->ReportQueryRequest(dbCounters, ev->Get()->GetRequestSize(), ev->Get()->GetParametersSize(), ev->Get()->GetQuerySize()); + Counters->ReportQueryAction(dbCounters, request.GetAction()); + Counters->ReportQueryType(dbCounters, request.GetType()); auto queryLimitBytes = TableServiceConfig.GetQueryLimitBytes(); - if (queryLimitBytes && IsSqlQuery(request.GetType())) { - auto querySizeBytes = request.GetQuery().size(); - if (querySizeBytes > queryLimitBytes) { - TString error = TStringBuilder() << "Query text size exceeds limit (" << querySizeBytes << "b > " << queryLimitBytes << "b)"; - ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); - return; - } + if (queryLimitBytes && IsSqlQuery(request.GetType()) && ev->Get()->GetQuerySize() > queryLimitBytes) { + TString error = TStringBuilder() << "Query text size exceeds limit (" + << ev->Get()->GetQuerySize() << "b > " << queryLimitBytes << "b)"; + ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); + return; } auto paramsLimitBytes = TableServiceConfig.GetParametersLimitBytes(); - if (paramsLimitBytes) { - auto paramsBytes = request.GetParameters().ByteSizeLong(); - if (paramsBytes > paramsLimitBytes) { - TString error = TStringBuilder() << "Parameters size exceeds limit (" << paramsBytes << "b > " << paramsLimitBytes << "b)"; - ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); - return; - } + if (paramsLimitBytes && ev->Get()->GetParametersSize() > paramsLimitBytes) { + TString error = TStringBuilder() << "Parameters size exceeds limit (" + << ev->Get()->GetParametersSize() << "b > " << paramsLimitBytes << "b)"; + ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); + return; } if (request.HasTxControl() && request.GetTxControl().has_begin_tx()) { @@ -573,7 +571,7 @@ public: const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; - LogRequest(request, requestInfo, ev->Sender, dbCounters); + Counters->ReportCloseSession(dbCounters, request.ByteSize()); if (LocalSessions->IsPendingShutdown(sessionId) && dbCounters) { Counters->ReportSessionGracefulShutdownHit(dbCounters); @@ -601,7 +599,8 @@ public: ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest); const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); + KQP_PROXY_LOG_D("Received ping session request, request_id: " << requestId << ", trace_id: " << traceId); + Counters->ReportPingSession(dbCounters, request.ByteSize()); TActorId targetId; if (sessionInfo) { @@ -1058,39 +1057,6 @@ private: Counters->ReportResponseStatus(dbCounters, event.ByteSize(), event.GetStatus()); } - void LogRequest(const NKikimrKqp::TCloseSessionRequest& request, - const TKqpRequestInfo& requestInfo, const TActorId& sender, - TKqpDbCountersPtr dbCounters) - { - KQP_PROXY_LOG_D(requestInfo << "Received close session request, sender: " << sender << ", SessionId: " << request.GetSessionId()); - Counters->ReportCloseSession(dbCounters, request.ByteSize()); - } - - void LogRequest(const NKikimrKqp::TQueryRequest& request, - const TKqpRequestInfo& requestInfo, const TActorId& sender, ui64 requestId, - TKqpDbCountersPtr dbCounters) - { - KQP_PROXY_LOG_D(requestInfo << "Received new query request, sender: " << sender << ", RequestId: " << requestId - << ", Query: \"" << request.GetQuery().substr(0, 10000) << "\""); - Counters->ReportQueryRequest(dbCounters, request); - } - - void LogRequest(const NKikimrKqp::TCreateSessionRequest& request, - const TKqpRequestInfo& requestInfo, const TActorId& sender, - TKqpDbCountersPtr dbCounters) - { - KQP_PROXY_LOG_D(requestInfo << "Received create session request, sender: " << sender); - Counters->ReportCreateSession(dbCounters, request.ByteSize()); - } - - void LogRequest(const NKikimrKqp::TPingSessionRequest& request, - const TKqpRequestInfo& requestInfo, const TActorId& sender, ui64 requestId, - TKqpDbCountersPtr dbCounters) - { - KQP_PROXY_LOG_D(requestInfo << "Received ping session request, sender: " << sender << " selfID: " << SelfId() << ", RequestId: " << requestId); - Counters->ReportPingSession(dbCounters, request.ByteSize()); - } - bool ReplyProcessError(Ydb::StatusIds::StatusCode ydbStatus, const TString& message, ui64 requestId) { auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message); |