aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-11 02:01:07 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-11 02:01:07 +0300
commitdce385094ed7a824e63041a66e1b74c9ec20bef1 (patch)
tree5724d8ef1a963374ff6088e91dfed561c9cdb85d
parent1b07c6b09dd4d79b1491e24dcc1c2599dc47f9d4 (diff)
downloadydb-dce385094ed7a824e63041a66e1b74c9ec20bef1.tar.gz
Add LWTrace probes into session_actor KIKIMR-14476
ref:603fa4188589f7e0e97ba939288a09f451e9d745
-rw-r--r--ydb/core/kqp/common/CMakeLists.txt2
-rw-r--r--ydb/core/kqp/common/kqp_lwtrace_probes.cpp14
-rw-r--r--ydb/core/kqp/common/kqp_lwtrace_probes.h48
-rw-r--r--ydb/core/kqp/common/ya.make4
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp42
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp5
6 files changed, 110 insertions, 5 deletions
diff --git a/ydb/core/kqp/common/CMakeLists.txt b/ydb/core/kqp/common/CMakeLists.txt
index 848ee452871..460eefd0f98 100644
--- a/ydb/core/kqp/common/CMakeLists.txt
+++ b/ydb/core/kqp/common/CMakeLists.txt
@@ -22,6 +22,7 @@ target_link_libraries(core-kqp-common PUBLIC
yql-core-issue
yql-dq-actors
yql-dq-common
+ library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-common PRIVATE
@@ -31,6 +32,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_transform.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
)
generate_enum_serilization(core-kqp-common
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h
diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.cpp b/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
new file mode 100644
index 00000000000..5950d7395f1
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
@@ -0,0 +1,14 @@
+#include "kqp_lwtrace_probes.h"
+
+#include <ydb/core/protos/kqp.pb.h>
+
+
+void TQueryType::ToString(TStoreType value, TString* out) {
+ *out = TStringBuilder() << (NKikimrKqp::EQueryType)value;
+}
+
+void TQueryAction::ToString(TStoreType value, TString* out) {
+ *out = TStringBuilder() << (NKikimrKqp::EQueryAction)value;
+}
+
+LWTRACE_DEFINE_PROVIDER(KQP_PROVIDER);
diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h
new file mode 100644
index 00000000000..5a9394ba09e
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h
@@ -0,0 +1,48 @@
+#pragma once
+
+#include <library/cpp/lwtrace/all.h>
+
+struct TQueryType {
+ using TStoreType = ui32;
+ using TFuncParam = ui32;
+
+ static void ToString(TStoreType value, TString* out);
+ static TStoreType ToStoreType(TFuncParam value) {
+ return value;
+ }
+};
+
+struct TQueryAction {
+ using TStoreType = ui32;
+ using TFuncParam = ui32;
+
+ static void ToString(TStoreType value, TString* out);
+ static TStoreType ToStoreType(TFuncParam value) {
+ return value;
+ }
+};
+
+#define KQP_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \
+ PROBE(KqpQueryRequest, GROUPS("KQP"), \
+ TYPES(TString, TQueryType, TQueryAction, TString), \
+ NAMES("database", "type", "action", "query")) \
+ PROBE(KqpQueryCompiled, GROUPS("KQP"), \
+ TYPES(TString), \
+ NAMES("compileResultStatus")) \
+ PROBE(KqpPhyQueryDefer, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("currentTx")) \
+ PROBE(KqpPhyQueryProposeTx, GROUPS("KQP"), \
+ TYPES(ui64, ui32, ui32, bool), \
+ NAMES("currentTx", "transactionsSize", "locksSize", "shouldAcquireLocks")) \
+ PROBE(KqpPhyQueryTxResponse, GROUPS("KQP"), \
+ TYPES(ui64, ui32), \
+ NAMES("currentTx", "resultsSize")) \
+ PROBE(KqpQueryReplySuccess, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("responseArenaSpaceUsed")) \
+ PROBE(KqpQueryReplyError, GROUPS("KQP"), \
+ TYPES(TString), \
+ NAMES("errMsg")) \
+/**/
+LWTRACE_DECLARE_PROVIDER(KQP_PROVIDER)
diff --git a/ydb/core/kqp/common/ya.make b/ydb/core/kqp/common/ya.make
index ca72ddeed80..2fd28814fca 100644
--- a/ydb/core/kqp/common/ya.make
+++ b/ydb/core/kqp/common/ya.make
@@ -17,6 +17,8 @@ SRCS(
kqp_yql.h
kqp_timeouts.h
kqp_timeouts.cpp
+ kqp_lwtrace_probes.h
+ kqp_lwtrace_probes.cpp
)
PEERDIR(
@@ -28,6 +30,8 @@ PEERDIR(
ydb/library/yql/core/issue
ydb/library/yql/dq/actors
ydb/library/yql/dq/common
+ library/cpp/lwtrace
+ #library/cpp/lwtrace/protos
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 97cca295cb0..de7ce965f13 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -1,5 +1,6 @@
#include "kqp_impl.h"
+#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/common/kqp_ru_calc.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
#include <ydb/core/kqp/common/kqp_transform.h>
@@ -23,6 +24,8 @@
#include <util/string/printf.h>
+LWTRACE_USING(KQP_PROVIDER);
+
namespace NKikimr {
namespace NKqp {
@@ -61,6 +64,7 @@ struct TKqpQueryState {
TString UserToken;
+ NLWTrace::TOrbit Orbit;
TString TxId; // User tx
bool Commit = true;
@@ -246,15 +250,18 @@ public:
"Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId());
MakeNewQueryState();
+
QueryState->Request.Swap(event.MutableRequest());
auto& queryRequest = QueryState->Request;
-
YQL_ENSURE(queryRequest.GetDatabase() == Settings.Database,
"Wrong database, expected:" << Settings.Database << ", got: " << queryRequest.GetDatabase());
YQL_ENSURE(queryRequest.HasAction());
auto action = queryRequest.GetAction();
+ LWTRACK(KqpQueryRequest, QueryState->Orbit, queryRequest.GetDatabase(),
+ queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED,
+ action, queryRequest.GetQuery());
LOG_D(requestInfo << "Received request,"
<< " proxyRequestId: " << proxyRequestId
<< " query: " << (queryRequest.HasQuery() ? queryRequest.GetQuery().Quote() : "")
@@ -352,6 +359,8 @@ public:
YQL_ENSURE(compileResult);
YQL_ENSURE(QueryState);
+ LWTRACK(KqpQueryCompiled, QueryState->Orbit, TStringBuilder() << compileResult->Status);
+
if (compileResult->Status != Ydb::StatusIds::SUCCESS) {
if (ReplyQueryCompileError(compileResult)) {
Cleanup();
@@ -389,7 +398,7 @@ public:
Become(&TKqpSessionActor::ExecuteState);
// Can reply inside (in case of deferred-only transactions) and become ReadyState
- ExecuteOrDeferr();
+ ExecuteOrDefer();
}
void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) {
@@ -474,6 +483,9 @@ public:
QueryState->QueryCtx->TimeProvider = TAppData::TimeProvider;
QueryState->QueryCtx->RandomProvider = TAppData::RandomProvider;
+ //const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
+ //ApplyTableOperations(QueryState->QueryCtx.Get(), phyQuery);
+
auto action = queryRequest.GetAction();
auto queryType = queryRequest.GetType();
@@ -648,7 +660,7 @@ public:
}
- void ExecuteOrDeferr() {
+ void ExecuteOrDefer() {
auto& txCtx = *QueryState->TxCtx;
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) {
@@ -662,6 +674,7 @@ public:
auto tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery,
&phyQuery.GetTransactions(QueryState->CurrentTx));
+
while (tx->GetHasEffects()) {
if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) {
ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST,
@@ -669,6 +682,7 @@ public:
return;
}
if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
+ LWTRACK(KqpPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
++QueryState->CurrentTx;
tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery,
&phyQuery.GetTransactions(QueryState->CurrentTx));
@@ -743,6 +757,8 @@ public:
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
}
+ LWTRACK(KqpPhyQueryProposeTx, QueryState->Orbit, QueryState->CurrentTx, request.Transactions.size(),
+ request.Locks.size(), request.AcquireLocksTxId.Defined());
SendToExecuter(std::move(request));
}
@@ -771,6 +787,7 @@ public:
return;
}
+ LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize());
// save tx results
auto& txResult = *response->MutableResult();
TVector<NKikimrMiniKQL::TResult> txResults;
@@ -804,7 +821,7 @@ public:
if (QueryState->PreparedQuery &&
QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) {
- ExecuteOrDeferr();
+ ExecuteOrDefer();
} else {
ReplySuccess();
}
@@ -823,6 +840,20 @@ public:
resStats->MutableCompilation()->Swap(&QueryState->CompileStats);
}
+ /*
+ void ApplyTableOperations(TKikimrQueryContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
+ TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end());
+ TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end());
+
+ auto isolationLevel = *txCtx->EffectiveIsolationLevel;
+ bool strictDml = true;
+
+ TExprContext ctx;
+ txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml,
+ EKikimrQueryType::Dml, ctx);
+ }
+ */
+
void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
Y_VERIFY(QueryState);
if (QueryState->TxId) {
@@ -922,6 +953,8 @@ public:
resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS);
Reply(std::move(resEv));
+ LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, arena->SpaceUsed());
+
Cleanup();
}
@@ -1149,6 +1182,7 @@ public:
auto ev = std::make_unique<TEvKqp::TEvQueryResponse>();
ev->Record.GetRef().SetYdbStatus(ydbStatus);
+ LWTRACK(KqpQueryReplyError, QueryState->Orbit, message);
auto* response = ev->Record.GetRef().MutableResponse();
auto *queryIssue = response->AddQueryIssues();
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index 4c444eee4ff..13f30767469 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -14,6 +14,7 @@
#include <ydb/core/kqp/runtime/kqp_spilling_file.h>
#include <ydb/core/kqp/runtime/kqp_spilling.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
+#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
@@ -26,8 +27,9 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/interconnect/interconnect.h>
-#include <library/cpp/resource/resource.h>
+#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/resource/resource.h>
namespace NKikimr::NKqp {
@@ -279,6 +281,7 @@ public:
, ModuleResolverState() {}
void Bootstrap() {
+ NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER));
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext());
ModuleResolverState = MakeIntrusive<TModuleResolverState>();