diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-11 02:01:07 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-11 02:01:07 +0300 |
commit | dce385094ed7a824e63041a66e1b74c9ec20bef1 (patch) | |
tree | 5724d8ef1a963374ff6088e91dfed561c9cdb85d | |
parent | 1b07c6b09dd4d79b1491e24dcc1c2599dc47f9d4 (diff) | |
download | ydb-dce385094ed7a824e63041a66e1b74c9ec20bef1.tar.gz |
Add LWTrace probes into session_actor KIKIMR-14476
ref:603fa4188589f7e0e97ba939288a09f451e9d745
-rw-r--r-- | ydb/core/kqp/common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_lwtrace_probes.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_lwtrace_probes.h | 48 | ||||
-rw-r--r-- | ydb/core/kqp/common/ya.make | 4 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 42 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 5 |
6 files changed, 110 insertions, 5 deletions
diff --git a/ydb/core/kqp/common/CMakeLists.txt b/ydb/core/kqp/common/CMakeLists.txt index 848ee452871..460eefd0f98 100644 --- a/ydb/core/kqp/common/CMakeLists.txt +++ b/ydb/core/kqp/common/CMakeLists.txt @@ -22,6 +22,7 @@ target_link_libraries(core-kqp-common PUBLIC yql-core-issue yql-dq-actors yql-dq-common + library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-common PRIVATE @@ -31,6 +32,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_transform.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ) generate_enum_serilization(core-kqp-common ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.cpp b/ydb/core/kqp/common/kqp_lwtrace_probes.cpp new file mode 100644 index 00000000000..5950d7395f1 --- /dev/null +++ b/ydb/core/kqp/common/kqp_lwtrace_probes.cpp @@ -0,0 +1,14 @@ +#include "kqp_lwtrace_probes.h" + +#include <ydb/core/protos/kqp.pb.h> + + +void TQueryType::ToString(TStoreType value, TString* out) { + *out = TStringBuilder() << (NKikimrKqp::EQueryType)value; +} + +void TQueryAction::ToString(TStoreType value, TString* out) { + *out = TStringBuilder() << (NKikimrKqp::EQueryAction)value; +} + +LWTRACE_DEFINE_PROVIDER(KQP_PROVIDER); diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h new file mode 100644 index 00000000000..5a9394ba09e --- /dev/null +++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h @@ -0,0 +1,48 @@ +#pragma once + +#include <library/cpp/lwtrace/all.h> + +struct TQueryType { + using TStoreType = ui32; + using TFuncParam = ui32; + + static void ToString(TStoreType value, TString* out); + static TStoreType ToStoreType(TFuncParam value) { + return value; + } +}; + +struct TQueryAction { + using TStoreType = ui32; + using TFuncParam = ui32; + + static void ToString(TStoreType value, TString* out); + static TStoreType ToStoreType(TFuncParam value) { + return value; + } +}; + +#define KQP_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ + PROBE(KqpQueryRequest, GROUPS("KQP"), \ + TYPES(TString, TQueryType, TQueryAction, TString), \ + NAMES("database", "type", "action", "query")) \ + PROBE(KqpQueryCompiled, GROUPS("KQP"), \ + TYPES(TString), \ + NAMES("compileResultStatus")) \ + PROBE(KqpPhyQueryDefer, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("currentTx")) \ + PROBE(KqpPhyQueryProposeTx, GROUPS("KQP"), \ + TYPES(ui64, ui32, ui32, bool), \ + NAMES("currentTx", "transactionsSize", "locksSize", "shouldAcquireLocks")) \ + PROBE(KqpPhyQueryTxResponse, GROUPS("KQP"), \ + TYPES(ui64, ui32), \ + NAMES("currentTx", "resultsSize")) \ + PROBE(KqpQueryReplySuccess, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("responseArenaSpaceUsed")) \ + PROBE(KqpQueryReplyError, GROUPS("KQP"), \ + TYPES(TString), \ + NAMES("errMsg")) \ +/**/ +LWTRACE_DECLARE_PROVIDER(KQP_PROVIDER) diff --git a/ydb/core/kqp/common/ya.make b/ydb/core/kqp/common/ya.make index ca72ddeed80..2fd28814fca 100644 --- a/ydb/core/kqp/common/ya.make +++ b/ydb/core/kqp/common/ya.make @@ -17,6 +17,8 @@ SRCS( kqp_yql.h kqp_timeouts.h kqp_timeouts.cpp + kqp_lwtrace_probes.h + kqp_lwtrace_probes.cpp ) PEERDIR( @@ -28,6 +30,8 @@ PEERDIR( ydb/library/yql/core/issue ydb/library/yql/dq/actors ydb/library/yql/dq/common + library/cpp/lwtrace + #library/cpp/lwtrace/protos ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 97cca295cb0..de7ce965f13 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -1,5 +1,6 @@ #include "kqp_impl.h" +#include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/kqp/common/kqp_ru_calc.h> #include <ydb/core/kqp/common/kqp_timeouts.h> #include <ydb/core/kqp/common/kqp_transform.h> @@ -23,6 +24,8 @@ #include <util/string/printf.h> +LWTRACE_USING(KQP_PROVIDER); + namespace NKikimr { namespace NKqp { @@ -61,6 +64,7 @@ struct TKqpQueryState { TString UserToken; + NLWTrace::TOrbit Orbit; TString TxId; // User tx bool Commit = true; @@ -246,15 +250,18 @@ public: "Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId()); MakeNewQueryState(); + QueryState->Request.Swap(event.MutableRequest()); auto& queryRequest = QueryState->Request; - YQL_ENSURE(queryRequest.GetDatabase() == Settings.Database, "Wrong database, expected:" << Settings.Database << ", got: " << queryRequest.GetDatabase()); YQL_ENSURE(queryRequest.HasAction()); auto action = queryRequest.GetAction(); + LWTRACK(KqpQueryRequest, QueryState->Orbit, queryRequest.GetDatabase(), + queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED, + action, queryRequest.GetQuery()); LOG_D(requestInfo << "Received request," << " proxyRequestId: " << proxyRequestId << " query: " << (queryRequest.HasQuery() ? queryRequest.GetQuery().Quote() : "") @@ -352,6 +359,8 @@ public: YQL_ENSURE(compileResult); YQL_ENSURE(QueryState); + LWTRACK(KqpQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status); + if (compileResult->Status != Ydb::StatusIds::SUCCESS) { if (ReplyQueryCompileError(compileResult)) { Cleanup(); @@ -389,7 +398,7 @@ public: Become(&TKqpSessionActor::ExecuteState); // Can reply inside (in case of deferred-only transactions) and become ReadyState - ExecuteOrDeferr(); + ExecuteOrDefer(); } void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) { @@ -474,6 +483,9 @@ public: QueryState->QueryCtx->TimeProvider = TAppData::TimeProvider; QueryState->QueryCtx->RandomProvider = TAppData::RandomProvider; + //const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); + //ApplyTableOperations(QueryState->QueryCtx.Get(), phyQuery); + auto action = queryRequest.GetAction(); auto queryType = queryRequest.GetType(); @@ -648,7 +660,7 @@ public: } - void ExecuteOrDeferr() { + void ExecuteOrDefer() { auto& txCtx = *QueryState->TxCtx; auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { @@ -662,6 +674,7 @@ public: auto tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx)); + while (tx->GetHasEffects()) { if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) { ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, @@ -669,6 +682,7 @@ public: return; } if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { + LWTRACK(KqpPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); ++QueryState->CurrentTx; tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx)); @@ -743,6 +757,8 @@ public: request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); } + LWTRACK(KqpPhyQueryProposeTx, QueryState->Orbit, QueryState->CurrentTx, request.Transactions.size(), + request.Locks.size(), request.AcquireLocksTxId.Defined()); SendToExecuter(std::move(request)); } @@ -771,6 +787,7 @@ public: return; } + LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize()); // save tx results auto& txResult = *response->MutableResult(); TVector<NKikimrMiniKQL::TResult> txResults; @@ -804,7 +821,7 @@ public: if (QueryState->PreparedQuery && QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) { - ExecuteOrDeferr(); + ExecuteOrDefer(); } else { ReplySuccess(); } @@ -823,6 +840,20 @@ public: resStats->MutableCompilation()->Swap(&QueryState->CompileStats); } + /* + void ApplyTableOperations(TKikimrQueryContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { + TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end()); + TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end()); + + auto isolationLevel = *txCtx->EffectiveIsolationLevel; + bool strictDml = true; + + TExprContext ctx; + txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, + EKikimrQueryType::Dml, ctx); + } + */ + void FillTxInfo(NKikimrKqp::TQueryResponse* response) { Y_VERIFY(QueryState); if (QueryState->TxId) { @@ -922,6 +953,8 @@ public: resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS); Reply(std::move(resEv)); + LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, arena->SpaceUsed()); + Cleanup(); } @@ -1149,6 +1182,7 @@ public: auto ev = std::make_unique<TEvKqp::TEvQueryResponse>(); ev->Record.GetRef().SetYdbStatus(ydbStatus); + LWTRACK(KqpQueryReplyError, QueryState->Orbit, message); auto* response = ev->Record.GetRef().MutableResponse(); auto *queryIssue = response->AddQueryIssues(); diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index 4c444eee4ff..13f30767469 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -14,6 +14,7 @@ #include <ydb/core/kqp/runtime/kqp_spilling_file.h> #include <ydb/core/kqp/runtime/kqp_spilling.h> #include <ydb/core/kqp/common/kqp_timeouts.h> +#include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/public/lib/operation_id/operation_id.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> @@ -26,8 +27,9 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/interconnect/interconnect.h> -#include <library/cpp/resource/resource.h> +#include <library/cpp/lwtrace/mon/mon_lwtrace.h> #include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/resource/resource.h> namespace NKikimr::NKqp { @@ -279,6 +281,7 @@ public: , ModuleResolverState() {} void Bootstrap() { + NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER)); Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext()); ModuleResolverState = MakeIntrusive<TModuleResolverState>(); |