diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-18 17:24:04 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-18 17:24:04 +0300 |
commit | acb4bdaf2acd8cbbebd08d1cf0b0701e233eca7d (patch) | |
tree | 681564fd9682ecf9e1a59191ed7954be3d9b3bad | |
parent | ae99d9dd31bd36aa7cebc62c32faa43906cbb5ae (diff) | |
download | ydb-acb4bdaf2acd8cbbebd08d1cf0b0701e233eca7d.tar.gz |
Add SysView reporting into session_actor KIKIMR-11938
ref:f27af80cb9a5b357e39f8fc6e3da85e9f321ba3d
-rw-r--r-- | ydb/core/kqp/kqp_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_response.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 152 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_common.h | 34 |
5 files changed, 174 insertions, 46 deletions
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index 4081965fda..f5240f5e81 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -88,6 +88,7 @@ TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const T TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const NYql::TIssue& issue); Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult); +Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::TIssues& issues); void AddQueryIssues(NKikimrKqp::TQueryResponse& response, const NYql::TIssues& issues); bool HasSchemeOrFatalIssues(const NYql::TIssues& issues); diff --git a/ydb/core/kqp/kqp_response.cpp b/ydb/core/kqp/kqp_response.cpp index 3ca33f640e..8d96beaac4 100644 --- a/ydb/core/kqp/kqp_response.cpp +++ b/ydb/core/kqp/kqp_response.cpp @@ -118,9 +118,12 @@ Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& q if (queryResult.Success()) { return Ydb::StatusIds::SUCCESS; } + return GetYdbStatus(queryResult.Issues()); +} +Ydb::StatusIds::StatusCode GetYdbStatus(const TIssues& issues) { TSet<Ydb::StatusIds::StatusCode> statuses; - for (const auto& topIssue : queryResult.Issues()) { + for (const auto& topIssue : issues) { CollectYdbStatuses(topIssue, statuses); } diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 3452262989..0de65aaa1a 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -1,4 +1,5 @@ #include "kqp_impl.h" +#include "kqp_worker_common.h" #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/kqp/common/kqp_ru_calc.h> @@ -9,6 +10,7 @@ #include <ydb/core/kqp/prepare/kqp_prepare.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> #include <ydb/core/kqp/provider/yql_kikimr_results.h> +#include <ydb/core/sys_view/service/sysview_service.h> #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> @@ -23,6 +25,7 @@ #include <library/cpp/actors/core/log.h> #include <util/string/printf.h> +#include <util/string/escape.h> LWTRACE_USING(KQP_PROVIDER); @@ -46,6 +49,7 @@ struct TKqpQueryState { TActorId Sender; ui64 ProxyRequestId = 0; NKikimrKqp::TQueryRequest Request; + ui64 ParametersSize = 0; TPreparedQueryConstPtr PreparedQuery; TKqpCompileResult::TConstPtr CompileResult; NKqpProto::TKqpStatsCompile CompileStats; @@ -299,6 +303,7 @@ public: QueryState->StartTime = TInstant::Now(); QueryState->UserToken = event.GetUserToken(); QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest); + QueryState->ParametersSize = queryRequest.GetParameters().ByteSize(); switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: @@ -429,7 +434,9 @@ public: QueryState->PreparedQuery = compileResult->PreparedQuery; QueryState->Request.SetQuery(QueryState->PreparedQuery->GetText()); - PrepareQueryContext(); + if (!PrepareQueryContext()) { + return; + } Become(&TKqpSessionActor::ExecuteState); // Can reply inside (in case of deferred-only transactions) and become ReadyState @@ -490,19 +497,19 @@ public: CreateNewTx(); } - static std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { + std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end()); TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end()); auto isolationLevel = *txCtx->EffectiveIsolationLevel; - bool strictDml = true; + bool strictDml = Config->StrictDml.Get(Settings.Cluster).GetOrElse(false); TExprContext ctx; bool success = txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, EKikimrQueryType::Dml, ctx); return {success, ctx.IssueManager.GetIssues()}; } - void PrepareQueryContext() { + bool PrepareQueryContext() { YQL_ENSURE(QueryState); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); @@ -538,8 +545,8 @@ public: auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery); if (!success) { YQL_ENSURE(!issues.Empty()); - ReplyQueryError(requestInfo, *GetYdbStatus(issues.back()), "", MessageFromIssues(issues)); - return; + ReplyQueryError(requestInfo, GetYdbStatus(issues), "", MessageFromIssues(issues)); + return false; } auto action = queryRequest.GetAction(); @@ -557,6 +564,7 @@ public: "Unexpected query type, expected: QUERY_TYPE_PREPARED_DML, got: " << queryType); ParseParameters(std::move(*QueryState->Request.MutableParameters()), QueryState->QueryCtx->Parameters); + return true; } static void ParseParameters(NKikimrMiniKQL::TParams&& parameters, TKikimrParamsMap& map) { @@ -895,17 +903,123 @@ public: ReplyQueryError(requestInfo, msg.GetStatusCode(), "Got AbortExecution", MessageFromIssues(issues)); } - void FillStats(NKikimrKqp::TQueryResponse* response) { - // TODO - // Compile status - // Execution stats (duration, comsumed RU) + TString ExtractQueryText() const { + auto compileResult = QueryState->CompileResult; + if (compileResult) { + if (compileResult->Query) { + return compileResult->Query->Text; + } + return {}; + } + return QueryState->Request.GetQuery(); + } - auto* resStats = response->MutableQueryStats(); - resStats->Swap(&QueryState->Stats); + void CollectSystemViewQueryStats(const NKqpProto::TKqpStatsQuery* stats, TDuration queryDuration, + const TString& database, ui64 requestUnits) + { + auto type = QueryState->Request.GetType(); + switch (type) { + case NKikimrKqp::QUERY_TYPE_SQL_DML: + case NKikimrKqp::QUERY_TYPE_PREPARED_DML: + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: { + TString text = ExtractQueryText(); + if (IsQueryAllowedToLog(text)) { + auto userSID = NACLib::TUserToken(QueryState->UserToken).GetUserSID(); + NSysView::CollectQueryStats(TlsActivationContext->AsActorContext(), stats, queryDuration, text, + userSID, QueryState->ParametersSize, database, type, requestUnits); + } + break; + } + default: + break; + } + } - resStats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds()); - //resStats->SetWorkerCpuTimeUs(); - resStats->MutableCompilation()->Swap(&QueryState->CompileStats); + void SlowLogQuery(const TActorContext &ctx, const TKqpRequestInfo& requestInfo, const TDuration& duration, + Ydb::StatusIds::StatusCode status, const std::function<ui64()>& resultsSizeFunc) + { + auto logSettings = ctx.LoggerSettings(); + if (!logSettings) { + return; + } + + ui32 thresholdMs = 0; + NActors::NLog::EPriority priority; + + if (logSettings->Satisfies(NActors::NLog::PRI_TRACE, NKikimrServices::KQP_SLOW_LOG)) { + priority = NActors::NLog::PRI_TRACE; + thresholdMs = Config->_KqpSlowLogTraceThresholdMs.Get().GetRef(); + } else if (logSettings->Satisfies(NActors::NLog::PRI_NOTICE, NKikimrServices::KQP_SLOW_LOG)) { + priority = NActors::NLog::PRI_NOTICE; + thresholdMs = Config->_KqpSlowLogNoticeThresholdMs.Get().GetRef(); + } else if (logSettings->Satisfies(NActors::NLog::PRI_WARN, NKikimrServices::KQP_SLOW_LOG)) { + priority = NActors::NLog::PRI_WARN; + thresholdMs = Config->_KqpSlowLogWarningThresholdMs.Get().GetRef(); + } else { + return; + } + + if (duration >= TDuration::MilliSeconds(thresholdMs)) { + auto username = NACLib::TUserToken(QueryState->UserToken).GetUserSID(); + if (username.empty()) { + username = "UNAUTHENTICATED"; + } + + auto queryText = ExtractQueryText(); + + auto paramsText = TStringBuilder() + << ToString(QueryState->ParametersSize) + << 'b'; + + ui64 resultsSize = 0; + if (resultsSizeFunc) { + resultsSize = resultsSizeFunc(); + } + + LOG_LOG_S(ctx, priority, NKikimrServices::KQP_SLOW_LOG, requestInfo + << "Slow query, duration: " << duration.ToString() + << ", status: " << status + << ", user: " << username + << ", results: " << resultsSize << 'b' + << ", text: \"" << EscapeC(queryText) << '"' + << ", parameters: " << paramsText); + } + } + + void FillStats(NKikimrKqp::TEvQueryResponse* record) { + auto *response = record->MutableResponse(); + auto* stats = response->MutableQueryStats(); + stats->Swap(&QueryState->Stats); + + stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds()); + //stats->SetWorkerCpuTimeUs(QueryState->CpuTime.MicroSeconds()); + if (QueryState->CompileResult) { + stats->MutableCompilation()->Swap(&QueryState->CompileStats); + } + + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + YQL_ENSURE(QueryState); + const auto& queryRequest = QueryState->Request; + + if (IsExecuteAction(queryRequest.GetAction())) { + auto ru = NRuCalc::CalcRequestUnit(*stats); + record->SetConsumedRu(ru); + + auto now = TInstant::Now(); + auto queryDuration = now - QueryState->StartTime; + CollectSystemViewQueryStats(stats, queryDuration, queryRequest.GetDatabase(), ru); + SlowLogQuery(TlsActivationContext->AsActorContext(), requestInfo, queryDuration, record->GetYdbStatus(), + [record]() { + ui64 resultsSize = 0; + for (auto& result : record->GetResponse().GetResults()) { + resultsSize += result.ByteSize(); + } + return resultsSize; + } + ); + } } void FillTxInfo(NKikimrKqp::TQueryResponse* response) { @@ -935,10 +1049,10 @@ public: auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>(); std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena()); resEv->Record.Realloc(arena); + auto *record = &resEv->Record.GetRef(); + auto *response = record->MutableResponse(); - auto *response = resEv->Record.GetRef().MutableResponse(); - - FillStats(response); + FillStats(record); if (QueryState->Commit) { ResetTxState(); @@ -1370,7 +1484,7 @@ private: LOG_E("Internal error, SelfId: " << SelfId() << ", message: " << message); if (QueryState) { auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - bool canContinue = ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message); + bool canContinue = ReplyQueryError(requestInfo, Ydb::StatusIds::INTERNAL_ERROR, message); if (!canContinue) { PassAway(); } diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 94a89532d0..9cde5d9782 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -1,5 +1,6 @@ #include "kqp_impl.h" #include "kqp_metadata_loader.h" +#include "kqp_worker_common.h" #include <ydb/core/kqp/common/kqp_ru_calc.h> #include <ydb/core/actorlib_impl/long_timer.h> @@ -1822,21 +1823,6 @@ private: } } - bool IsQueryAllowedToLog(const TString& text) { - static const TString user = "user"; - static const TString password = "password"; - auto itUser = std::search(text.begin(), text.end(), user.begin(), user.end(), [](const char a, const char b) -> bool { - return std::tolower(a) == b; - }); - if (itUser == text.end()) { - return true; - } - auto itPassword = std::search(itUser, text.end(), password.begin(), password.end(), [](const char a, const char b) -> bool { - return std::tolower(a) == b; - }); - return itPassword == text.end(); - } - TString ExtractQueryText() const { auto compileResult = QueryState->QueryCompileResult; if (compileResult) { @@ -1899,6 +1885,7 @@ private: } } + void FillCompileStatus(const TKqpCompileResult::TConstPtr& compileResult, TEvKqp::TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>& record) { @@ -2152,17 +2139,6 @@ private: return Nothing(); } - static bool IsExecuteAction(const NKikimrKqp::EQueryAction& action) { - switch (action) { - case NKikimrKqp::QUERY_ACTION_EXECUTE: - case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: - return true; - - default: - return false; - } - } - static bool IsDocumentApiRestricted(const TString& requestType) { return requestType != DocumentApiRequestType; } diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h new file mode 100644 index 0000000000..e462630181 --- /dev/null +++ b/ydb/core/kqp/kqp_worker_common.h @@ -0,0 +1,34 @@ +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/log.h> + +#include <util/datetime/base.h> +#include <util/generic/string.h> + + +namespace NKikimr::NKqp { + +inline bool IsExecuteAction(const NKikimrKqp::EQueryAction& action) { + switch (action) { + case NKikimrKqp::QUERY_ACTION_EXECUTE: + case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: + return true; + + default: + return false; + } +} + +inline bool IsQueryAllowedToLog(const TString& text) { + static const TString user = "user"; + static const TString password = "password"; + auto itUser = std::search(text.begin(), text.end(), user.begin(), user.end(), + [](const char a, const char b) -> bool { return std::tolower(a) == b; }); + if (itUser == text.end()) { + return true; + } + auto itPassword = std::search(itUser, text.end(), password.begin(), password.end(), + [](const char a, const char b) -> bool { return std::tolower(a) == b; }); + return itPassword == text.end(); +} + +} |