aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-18 17:24:04 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-18 17:24:04 +0300
commitacb4bdaf2acd8cbbebd08d1cf0b0701e233eca7d (patch)
tree681564fd9682ecf9e1a59191ed7954be3d9b3bad
parentae99d9dd31bd36aa7cebc62c32faa43906cbb5ae (diff)
downloadydb-acb4bdaf2acd8cbbebd08d1cf0b0701e233eca7d.tar.gz
Add SysView reporting into session_actor KIKIMR-11938
ref:f27af80cb9a5b357e39f8fc6e3da85e9f321ba3d
-rw-r--r--ydb/core/kqp/kqp_impl.h1
-rw-r--r--ydb/core/kqp/kqp_response.cpp5
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp152
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp28
-rw-r--r--ydb/core/kqp/kqp_worker_common.h34
5 files changed, 174 insertions, 46 deletions
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index 4081965fda..f5240f5e81 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -88,6 +88,7 @@ TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const T
TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const NYql::TIssue& issue);
Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult);
+Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::TIssues& issues);
void AddQueryIssues(NKikimrKqp::TQueryResponse& response, const NYql::TIssues& issues);
bool HasSchemeOrFatalIssues(const NYql::TIssues& issues);
diff --git a/ydb/core/kqp/kqp_response.cpp b/ydb/core/kqp/kqp_response.cpp
index 3ca33f640e..8d96beaac4 100644
--- a/ydb/core/kqp/kqp_response.cpp
+++ b/ydb/core/kqp/kqp_response.cpp
@@ -118,9 +118,12 @@ Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& q
if (queryResult.Success()) {
return Ydb::StatusIds::SUCCESS;
}
+ return GetYdbStatus(queryResult.Issues());
+}
+Ydb::StatusIds::StatusCode GetYdbStatus(const TIssues& issues) {
TSet<Ydb::StatusIds::StatusCode> statuses;
- for (const auto& topIssue : queryResult.Issues()) {
+ for (const auto& topIssue : issues) {
CollectYdbStatuses(topIssue, statuses);
}
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 3452262989..0de65aaa1a 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -1,4 +1,5 @@
#include "kqp_impl.h"
+#include "kqp_worker_common.h"
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/common/kqp_ru_calc.h>
@@ -9,6 +10,7 @@
#include <ydb/core/kqp/prepare/kqp_prepare.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
+#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
@@ -23,6 +25,7 @@
#include <library/cpp/actors/core/log.h>
#include <util/string/printf.h>
+#include <util/string/escape.h>
LWTRACE_USING(KQP_PROVIDER);
@@ -46,6 +49,7 @@ struct TKqpQueryState {
TActorId Sender;
ui64 ProxyRequestId = 0;
NKikimrKqp::TQueryRequest Request;
+ ui64 ParametersSize = 0;
TPreparedQueryConstPtr PreparedQuery;
TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile CompileStats;
@@ -299,6 +303,7 @@ public:
QueryState->StartTime = TInstant::Now();
QueryState->UserToken = event.GetUserToken();
QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest);
+ QueryState->ParametersSize = queryRequest.GetParameters().ByteSize();
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
@@ -429,7 +434,9 @@ public:
QueryState->PreparedQuery = compileResult->PreparedQuery;
QueryState->Request.SetQuery(QueryState->PreparedQuery->GetText());
- PrepareQueryContext();
+ if (!PrepareQueryContext()) {
+ return;
+ }
Become(&TKqpSessionActor::ExecuteState);
// Can reply inside (in case of deferred-only transactions) and become ReadyState
@@ -490,19 +497,19 @@ public:
CreateNewTx();
}
- static std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
+ std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end());
TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end());
auto isolationLevel = *txCtx->EffectiveIsolationLevel;
- bool strictDml = true;
+ bool strictDml = Config->StrictDml.Get(Settings.Cluster).GetOrElse(false);
TExprContext ctx;
bool success = txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, EKikimrQueryType::Dml, ctx);
return {success, ctx.IssueManager.GetIssues()};
}
- void PrepareQueryContext() {
+ bool PrepareQueryContext() {
YQL_ENSURE(QueryState);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
@@ -538,8 +545,8 @@ public:
auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery);
if (!success) {
YQL_ENSURE(!issues.Empty());
- ReplyQueryError(requestInfo, *GetYdbStatus(issues.back()), "", MessageFromIssues(issues));
- return;
+ ReplyQueryError(requestInfo, GetYdbStatus(issues), "", MessageFromIssues(issues));
+ return false;
}
auto action = queryRequest.GetAction();
@@ -557,6 +564,7 @@ public:
"Unexpected query type, expected: QUERY_TYPE_PREPARED_DML, got: " << queryType);
ParseParameters(std::move(*QueryState->Request.MutableParameters()), QueryState->QueryCtx->Parameters);
+ return true;
}
static void ParseParameters(NKikimrMiniKQL::TParams&& parameters, TKikimrParamsMap& map) {
@@ -895,17 +903,123 @@ public:
ReplyQueryError(requestInfo, msg.GetStatusCode(), "Got AbortExecution", MessageFromIssues(issues));
}
- void FillStats(NKikimrKqp::TQueryResponse* response) {
- // TODO
- // Compile status
- // Execution stats (duration, comsumed RU)
+ TString ExtractQueryText() const {
+ auto compileResult = QueryState->CompileResult;
+ if (compileResult) {
+ if (compileResult->Query) {
+ return compileResult->Query->Text;
+ }
+ return {};
+ }
+ return QueryState->Request.GetQuery();
+ }
- auto* resStats = response->MutableQueryStats();
- resStats->Swap(&QueryState->Stats);
+ void CollectSystemViewQueryStats(const NKqpProto::TKqpStatsQuery* stats, TDuration queryDuration,
+ const TString& database, ui64 requestUnits)
+ {
+ auto type = QueryState->Request.GetType();
+ switch (type) {
+ case NKikimrKqp::QUERY_TYPE_SQL_DML:
+ case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: {
+ TString text = ExtractQueryText();
+ if (IsQueryAllowedToLog(text)) {
+ auto userSID = NACLib::TUserToken(QueryState->UserToken).GetUserSID();
+ NSysView::CollectQueryStats(TlsActivationContext->AsActorContext(), stats, queryDuration, text,
+ userSID, QueryState->ParametersSize, database, type, requestUnits);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
- resStats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds());
- //resStats->SetWorkerCpuTimeUs();
- resStats->MutableCompilation()->Swap(&QueryState->CompileStats);
+ void SlowLogQuery(const TActorContext &ctx, const TKqpRequestInfo& requestInfo, const TDuration& duration,
+ Ydb::StatusIds::StatusCode status, const std::function<ui64()>& resultsSizeFunc)
+ {
+ auto logSettings = ctx.LoggerSettings();
+ if (!logSettings) {
+ return;
+ }
+
+ ui32 thresholdMs = 0;
+ NActors::NLog::EPriority priority;
+
+ if (logSettings->Satisfies(NActors::NLog::PRI_TRACE, NKikimrServices::KQP_SLOW_LOG)) {
+ priority = NActors::NLog::PRI_TRACE;
+ thresholdMs = Config->_KqpSlowLogTraceThresholdMs.Get().GetRef();
+ } else if (logSettings->Satisfies(NActors::NLog::PRI_NOTICE, NKikimrServices::KQP_SLOW_LOG)) {
+ priority = NActors::NLog::PRI_NOTICE;
+ thresholdMs = Config->_KqpSlowLogNoticeThresholdMs.Get().GetRef();
+ } else if (logSettings->Satisfies(NActors::NLog::PRI_WARN, NKikimrServices::KQP_SLOW_LOG)) {
+ priority = NActors::NLog::PRI_WARN;
+ thresholdMs = Config->_KqpSlowLogWarningThresholdMs.Get().GetRef();
+ } else {
+ return;
+ }
+
+ if (duration >= TDuration::MilliSeconds(thresholdMs)) {
+ auto username = NACLib::TUserToken(QueryState->UserToken).GetUserSID();
+ if (username.empty()) {
+ username = "UNAUTHENTICATED";
+ }
+
+ auto queryText = ExtractQueryText();
+
+ auto paramsText = TStringBuilder()
+ << ToString(QueryState->ParametersSize)
+ << 'b';
+
+ ui64 resultsSize = 0;
+ if (resultsSizeFunc) {
+ resultsSize = resultsSizeFunc();
+ }
+
+ LOG_LOG_S(ctx, priority, NKikimrServices::KQP_SLOW_LOG, requestInfo
+ << "Slow query, duration: " << duration.ToString()
+ << ", status: " << status
+ << ", user: " << username
+ << ", results: " << resultsSize << 'b'
+ << ", text: \"" << EscapeC(queryText) << '"'
+ << ", parameters: " << paramsText);
+ }
+ }
+
+ void FillStats(NKikimrKqp::TEvQueryResponse* record) {
+ auto *response = record->MutableResponse();
+ auto* stats = response->MutableQueryStats();
+ stats->Swap(&QueryState->Stats);
+
+ stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds());
+ //stats->SetWorkerCpuTimeUs(QueryState->CpuTime.MicroSeconds());
+ if (QueryState->CompileResult) {
+ stats->MutableCompilation()->Swap(&QueryState->CompileStats);
+ }
+
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ YQL_ENSURE(QueryState);
+ const auto& queryRequest = QueryState->Request;
+
+ if (IsExecuteAction(queryRequest.GetAction())) {
+ auto ru = NRuCalc::CalcRequestUnit(*stats);
+ record->SetConsumedRu(ru);
+
+ auto now = TInstant::Now();
+ auto queryDuration = now - QueryState->StartTime;
+ CollectSystemViewQueryStats(stats, queryDuration, queryRequest.GetDatabase(), ru);
+ SlowLogQuery(TlsActivationContext->AsActorContext(), requestInfo, queryDuration, record->GetYdbStatus(),
+ [record]() {
+ ui64 resultsSize = 0;
+ for (auto& result : record->GetResponse().GetResults()) {
+ resultsSize += result.ByteSize();
+ }
+ return resultsSize;
+ }
+ );
+ }
}
void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
@@ -935,10 +1049,10 @@ public:
auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>();
std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena());
resEv->Record.Realloc(arena);
+ auto *record = &resEv->Record.GetRef();
+ auto *response = record->MutableResponse();
- auto *response = resEv->Record.GetRef().MutableResponse();
-
- FillStats(response);
+ FillStats(record);
if (QueryState->Commit) {
ResetTxState();
@@ -1370,7 +1484,7 @@ private:
LOG_E("Internal error, SelfId: " << SelfId() << ", message: " << message);
if (QueryState) {
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- bool canContinue = ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message);
+ bool canContinue = ReplyQueryError(requestInfo, Ydb::StatusIds::INTERNAL_ERROR, message);
if (!canContinue) {
PassAway();
}
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 94a89532d0..9cde5d9782 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -1,5 +1,6 @@
#include "kqp_impl.h"
#include "kqp_metadata_loader.h"
+#include "kqp_worker_common.h"
#include <ydb/core/kqp/common/kqp_ru_calc.h>
#include <ydb/core/actorlib_impl/long_timer.h>
@@ -1822,21 +1823,6 @@ private:
}
}
- bool IsQueryAllowedToLog(const TString& text) {
- static const TString user = "user";
- static const TString password = "password";
- auto itUser = std::search(text.begin(), text.end(), user.begin(), user.end(), [](const char a, const char b) -> bool {
- return std::tolower(a) == b;
- });
- if (itUser == text.end()) {
- return true;
- }
- auto itPassword = std::search(itUser, text.end(), password.begin(), password.end(), [](const char a, const char b) -> bool {
- return std::tolower(a) == b;
- });
- return itPassword == text.end();
- }
-
TString ExtractQueryText() const {
auto compileResult = QueryState->QueryCompileResult;
if (compileResult) {
@@ -1899,6 +1885,7 @@ private:
}
}
+
void FillCompileStatus(const TKqpCompileResult::TConstPtr& compileResult,
TEvKqp::TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>& record)
{
@@ -2152,17 +2139,6 @@ private:
return Nothing();
}
- static bool IsExecuteAction(const NKikimrKqp::EQueryAction& action) {
- switch (action) {
- case NKikimrKqp::QUERY_ACTION_EXECUTE:
- case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
- return true;
-
- default:
- return false;
- }
- }
-
static bool IsDocumentApiRestricted(const TString& requestType) {
return requestType != DocumentApiRequestType;
}
diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h
new file mode 100644
index 0000000000..e462630181
--- /dev/null
+++ b/ydb/core/kqp/kqp_worker_common.h
@@ -0,0 +1,34 @@
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/string.h>
+
+
+namespace NKikimr::NKqp {
+
+inline bool IsExecuteAction(const NKikimrKqp::EQueryAction& action) {
+ switch (action) {
+ case NKikimrKqp::QUERY_ACTION_EXECUTE:
+ case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+inline bool IsQueryAllowedToLog(const TString& text) {
+ static const TString user = "user";
+ static const TString password = "password";
+ auto itUser = std::search(text.begin(), text.end(), user.begin(), user.end(),
+ [](const char a, const char b) -> bool { return std::tolower(a) == b; });
+ if (itUser == text.end()) {
+ return true;
+ }
+ auto itPassword = std::search(itUser, text.end(), password.begin(), password.end(),
+ [](const char a, const char b) -> bool { return std::tolower(a) == b; });
+ return itPassword == text.end();
+}
+
+}