aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-05-16 17:32:22 +0300
committersnaury <snaury@ydb.tech>2023-05-16 17:32:22 +0300
commita4abb45e22dc77aceeb14548f2f1fc820131fd51 (patch)
tree21538b48de8ca804985a8eff520acf341b0b3cf3
parentb0c5c30ee3bc0eb2c3c2c09f78add41af5cc0088 (diff)
downloadydb-a4abb45e22dc77aceeb14548f2f1fc820131fd51.tar.gz
Add some lwtrace probes to datashard
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt5
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt5
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt5
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt5
-rw-r--r--ydb/core/tx/datashard/complete_data_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard.cpp27
-rw-r--r--ydb/core/tx/datashard/datashard.h11
-rw-r--r--ydb/core/tx/datashard/datashard__propose_tx_base.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp17
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp1
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp5
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/operation.h4
-rw-r--r--ydb/core/tx/datashard/probes.cpp19
-rw-r--r--ydb/core/tx/datashard/probes.h78
-rw-r--r--ydb/core/tx/datashard/read_iterator.h6
16 files changed, 199 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index 89b7f824ac..6f4d1978e5 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -112,6 +112,8 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
@@ -280,6 +282,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp
@@ -371,6 +374,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 8ec494d2d8..3e69f6c4fa 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -113,6 +113,8 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
@@ -281,6 +283,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp
@@ -373,6 +376,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 8ec494d2d8..3e69f6c4fa 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -113,6 +113,8 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
@@ -281,6 +283,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp
@@ -373,6 +376,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index 48489bc23d..e819352bec 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -113,6 +113,8 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
@@ -281,6 +283,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp
@@ -368,6 +371,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ library-cpp-lwtrace
+ cpp-lwtrace-mon
monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
diff --git a/ydb/core/tx/datashard/complete_data_tx_unit.cpp b/ydb/core/tx/datashard/complete_data_tx_unit.cpp
index 60bb976459..db8597b55d 100644
--- a/ydb/core/tx/datashard/complete_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/complete_data_tx_unit.cpp
@@ -2,9 +2,12 @@
#include "datashard_impl.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
+#include "probes.h"
#include <ydb/core/engine/minikql/minikql_engine_host.h>
+LWTRACE_USING(DATASHARD_PROVIDER)
+
namespace NKikimr {
namespace NDataShard {
@@ -96,8 +99,10 @@ void TCompleteOperationUnit::CompleteOperation(TOperation::TPtr op,
DataShard.FillExecutionStats(op->GetExecutionProfile(), *result);
- if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId()))
+ if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
+ result->Orbit = std::move(op->Orbit);
DataShard.SendResult(ctx, result, op->GetTarget(), op->GetStep(), op->GetTxId());
+ }
}
Pipeline.RemoveCompletingOp(op);
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index ab36bdc66d..84c1bb336c 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1,5 +1,6 @@
#include "datashard_impl.h"
#include "datashard_txs.h"
+#include "probes.h"
#include <ydb/core/base/interconnect_channels.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
@@ -12,6 +13,8 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
+LWTRACE_USING(DATASHARD_PROVIDER)
+
namespace NKikimr {
IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info) {
@@ -162,6 +165,8 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
ETxTypes_descriptor
>());
TabletCounters = TabletCountersPtr.Get();
+
+ RegisterDataShardProbes();
}
NTabletPipe::TClientConfig TDataShard::GetPipeClientConfig() {
@@ -563,6 +568,7 @@ public:
ui64 resultSize = Result->GetTxResult().size();
ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0);
+ LWTRACK(ProposeTransactionSendResult, Result->Orbit);
Self->Send(Target, Result.Release(), flags);
}
@@ -605,6 +611,7 @@ void TDataShard::SendResult(const TActorContext &ctx,
ui64 resultSize = res->GetTxResult().size();
ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0);
+ LWTRACK(ProposeTransactionSendResult, res->Orbit);
ctx.Send(target, res.Release(), flags);
}
@@ -2483,6 +2490,7 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction*
bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReason);
if (reject) {
+ LWTRACK(ProposeTransactionReject, msg->Orbit);
THolder<TEvDataShard::TEvProposeTransactionResult> result =
THolder(new TEvDataShard::TEvProposeTransactionResult(msg->GetTxKind(),
TabletID(),
@@ -2507,12 +2515,16 @@ void TDataShard::UpdateProposeQueueSize() const {
}
void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
+ auto* msg = ev->Get();
+ LWTRACK(ProposeTransactionRequest, msg->Orbit);
+
// Check if we need to delay an immediate transaction
if (MediatorStateWaiting &&
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
!(ev->Get()->GetFlags() & TTxFlags::ForceOnline))
{
// We cannot calculate correct version until we restore mediator state
+ LWTRACK(ProposeTransactionWaitMediatorState, msg->Orbit);
MediatorStateWaitingMsgs.emplace_back(ev.Release());
UpdateProposeQueueSize();
return;
@@ -2521,6 +2533,7 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
if (Pipeline.HasProposeDelayers()) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored");
+ LWTRACK(ProposeTransactionWaitDelayers, msg->Orbit);
DelayedProposeQueue.emplace_back().Reset(ev.Release());
UpdateProposeQueueSize();
return;
@@ -2589,6 +2602,9 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransactionAttach::TPtr &ev, con
}
void TDataShard::HandleAsFollower(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
+ auto* msg = ev->Get();
+ LWTRACK(ProposeTransactionRequest, msg->Orbit);
+
IncCounter(COUNTER_PREPARE_REQUEST);
if (TxInFly() > GetMaxTxInFly()) {
@@ -2629,11 +2645,12 @@ void TDataShard::CheckDelayedProposeQueue(const TActorContext &ctx) {
}
void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx) {
+ auto* msg = ev->Get();
bool mayRunImmediate = false;
- if ((ev->Get()->GetFlags() & TTxFlags::Immediate) &&
- !(ev->Get()->GetFlags() & TTxFlags::ForceOnline) &&
- ev->Get()->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA)
+ if ((msg->GetFlags() & TTxFlags::Immediate) &&
+ !(msg->GetFlags() & TTxFlags::ForceOnline) &&
+ msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA)
{
// This transaction may run in immediate mode
mayRunImmediate = true;
@@ -2641,6 +2658,7 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&
if (mayRunImmediate) {
// Enqueue immediate transactions so they don't starve existing operations
+ LWTRACK(ProposeTransactionEnqueue, msg->Orbit);
ProposeQueue.Enqueue(std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
UpdateProposeQueueSize();
} else {
@@ -3051,6 +3069,8 @@ bool TDataShard::WaitPlanStep(ui64 step) {
}
bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const {
+ auto* msg = ev->Get();
+
if (MvccSwitchState == TSwitchState::SWITCHING) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction needs to wait because of mvcc state switching");
return true;
@@ -3062,6 +3082,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads());
if (rowVersion >= unreadableEdge) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge);
+ LWTRACK(ProposeTransactionWaitSnapshot, msg->Orbit, rowVersion.Step, rowVersion.TxId);
return true;
}
}
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 2e16d04d00..2c3b33190d 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -13,6 +13,7 @@
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/tablet_flat/flat_row_versions.h>
+#include <library/cpp/lwtrace/shuttle.h>
#include <library/cpp/time_provider/time_provider.h>
namespace arrow {
@@ -489,6 +490,9 @@ struct TEvDataShard {
TStringBuf GetTxBody() const {
return Record.GetTxBody();
}
+
+ // Orbit used for tracking request events
+ NLWTrace::TOrbit Orbit;
};
struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrTxDataShard::TEvCancelTransactionProposal, TEvDataShard::EvCancelTransactionProposal> {
@@ -661,6 +665,10 @@ struct TEvDataShard {
private:
bool ForceOnline = false;
bool ForceDirty = false;
+
+ public:
+ // Orbit used for tracking request events
+ NLWTrace::TOrbit Orbit;
};
struct TEvProposeTransactionRestart : public TEventPB<TEvProposeTransactionRestart, NKikimrTxDataShard::TEvProposeTransactionRestart, TEvDataShard::EvProposeTransactionRestart> {
@@ -911,6 +919,9 @@ struct TEvDataShard {
// True when TEvRead is cancelled while enqueued in a waiting queue
bool Cancelled = false;
+
+ // Orbit used for tracking request events
+ NLWTrace::TOrbit Orbit;
};
struct TEvReadResult : public TEventPB<TEvReadResult,
diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
index 650556bdd2..9f62e9246b 100644
--- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
+++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
@@ -1,10 +1,13 @@
#include "datashard_txs.h"
#include "datashard_failpoints.h"
#include "operation.h"
+#include "probes.h"
#include <ydb/core/util/pb.h>
#include <ydb/core/base/wilson.h>
+LWTRACE_USING(DATASHARD_PROVIDER)
+
namespace NKikimr {
namespace NDataShard {
@@ -89,6 +92,7 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
// Unsuccessful operation parse.
if (op->IsAborted()) {
+ LWTRACK(ProposeTransactionParsed, op->Orbit, false);
Y_VERIFY(op->Result());
if (ProposeTransactionSpan) {
@@ -98,6 +102,8 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
return true;
}
+ LWTRACK(ProposeTransactionParsed, op->Orbit, true);
+
op->BuildExecutionPlan(false);
if (!op->IsExecutionPlanFinished())
Self->Pipeline.GetExecutionUnit(op->GetCurrentUnit()).AddOperation(op);
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index f917121b6c..c28049b556 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -3,6 +3,7 @@
#include "datashard_read_operation.h"
#include "setup_sys_locks.h"
#include "datashard_locks_db.h"
+#include "probes.h"
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
@@ -10,6 +11,8 @@
#include <utility>
+LWTRACE_USING(DATASHARD_PROVIDER)
+
namespace NKikimr::NDataShard {
using namespace NTabletFlatExecutor;
@@ -1077,6 +1080,7 @@ public:
}
}
+ LWTRACK(ReadExecute, state.Orbit);
if (!Read(txc, ctx, state))
return EExecutionStatus::Restart;
@@ -1407,6 +1411,7 @@ public:
Reader->FillResult(*Result, state);
if (!gSkipReadIteratorResultFailPoint.Check(Self->TabletID())) {
+ LWTRACK(ReadSendResult, state.Orbit);
Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
}
}
@@ -1953,6 +1958,7 @@ public:
AppData()->MonotonicTimeProvider->Now(),
Self));
+ LWTRACK(ReadExecute, state.Orbit);
if (Reader->Read(txc, ctx)) {
// Retry later when dependencies are resolved
if (!Reader->GetVolatileReadDependencies().empty()) {
@@ -2082,6 +2088,7 @@ public:
<< ", firstUnprocessed# " << state.FirstUnprocessedQuery);
Reader->FillResult(*Result, state);
+ LWTRACK(ReadSendResult, state.Orbit);
Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
if (Reader->HasUnreadQueries()) {
@@ -2116,6 +2123,8 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
return;
}
+ LWTRACK(ReadRequest, request->Orbit, record.GetReadId());
+
TReadIteratorId readId(ev->Sender, record.GetReadId());
if (!Pipeline.HandleWaitingReadIterator(readId, request)) {
// This request has been cancelled
@@ -2147,6 +2156,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
}
if (MediatorStateWaiting) {
+ LWTRACK(ReadWaitMediatorState, request->Orbit);
Pipeline.RegisterWaitingReadIterator(readId, request);
MediatorStateWaitingMsgs.emplace_back(ev.Release());
UpdateProposeQueueSize();
@@ -2240,6 +2250,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
auto prioritizedMvccSnapshotReads = GetEnablePrioritizedMvccSnapshotReads();
TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads);
if (readVersion >= unreadableEdge) {
+ LWTRACK(ReadWaitSnapshot, request->Orbit, readVersion.Step, readVersion.TxId);
Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx);
return;
}
@@ -2328,7 +2339,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
ReadIterators.emplace(
readId,
- new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now()));
+ new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now(), std::move(request->Orbit)));
SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
@@ -2387,6 +2398,8 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
return;
}
+ LWTRACK(ReadAck, state.Orbit);
+
// We received ACK on message we hadn't sent yet
if (state.SeqNo < record.GetSeqNo()) {
auto issueStr = TStringBuilder() << TabletID() << " ReadAck from future: " << record.GetSeqNo()
@@ -2450,6 +2463,8 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte
IncCounter(COUNTER_READ_ITERATOR_CANCEL);
}
+ LWTRACK(ReadCancel, state->Orbit);
+
DeleteReadIterator(it);
}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 7d8dd3a1d4..4fcb433049 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1197,6 +1197,7 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
tx->SetTarget(ev->Get()->GetSource());
tx->SetTxBody(rec.GetTxBody());
tx->SetCookie(ev->Cookie);
+ tx->Orbit = std::move(ev->Get()->Orbit);
auto malformed = [&](const TStringBuf txType, const TString& txBody) {
const TString error = TStringBuilder() << "Malformed " << txType << " tx"
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 5f111993c5..58d5898a23 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -4,10 +4,13 @@
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
#include "datashard_locks_db.h"
+#include "probes.h"
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
+LWTRACE_USING(DATASHARD_PROVIDER)
+
namespace NKikimr {
namespace NDataShard {
@@ -218,6 +221,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
dataTx->SetVolatileTxId(tx->GetTxId());
}
+ LWTRACK(ProposeTransactionKqpDataExecute, op->Orbit);
+
KqpCommitLocks(tabletId, tx, writeVersion, DataShard);
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index 812b0796dd..dafbf08248 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -2,6 +2,9 @@
#include "datashard_impl.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
+#include "probes.h"
+
+LWTRACE_USING(DATASHARD_PROVIDER)
namespace NKikimr {
namespace NDataShard {
@@ -180,6 +183,7 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size());
if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
+ LWTRACK(ProposeTransactionSendResult, op->Orbit);
if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) {
DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie());
} else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) {
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 62c1efe721..183d25acaa 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -870,6 +870,10 @@ private:
public:
std::optional<TRowVersion> MvccReadWriteVersion;
+
+public:
+ // Orbit used for tracking operation progress
+ NLWTrace::TOrbit Orbit;
};
inline IOutputStream &operator <<(IOutputStream &out,
diff --git a/ydb/core/tx/datashard/probes.cpp b/ydb/core/tx/datashard/probes.cpp
new file mode 100644
index 0000000000..0cbd49a1e4
--- /dev/null
+++ b/ydb/core/tx/datashard/probes.cpp
@@ -0,0 +1,19 @@
+#include "probes.h"
+
+#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
+
+#include <mutex>
+
+LWTRACE_DEFINE_PROVIDER(DATASHARD_PROVIDER)
+
+namespace NKikimr::NDataShard {
+
+ void RegisterDataShardProbes() {
+ static std::once_flag flag;
+
+ std::call_once(flag, []{
+ NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DATASHARD_PROVIDER));
+ });
+ }
+
+}
diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h
new file mode 100644
index 0000000000..26bf44e87d
--- /dev/null
+++ b/ydb/core/tx/datashard/probes.h
@@ -0,0 +1,78 @@
+#pragma once
+
+#include <library/cpp/lwtrace/all.h>
+
+#define DATASHARD_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \
+ PROBE(ProposeTransactionRequest, \
+ GROUPS("DataShard", "LWTrackStart"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ProposeTransactionWaitMediatorState, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ProposeTransactionWaitDelayers, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ProposeTransactionWaitSnapshot, \
+ GROUPS("DataShard"), \
+ TYPES(ui64, ui64), \
+ NAMES("snapshotStep", "snapshotTxId")) \
+ PROBE(ProposeTransactionReject, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ProposeTransactionEnqueue, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ProposeTransactionParsed, \
+ GROUPS("DataShard"), \
+ TYPES(bool), \
+ NAMES("success")) \
+ PROBE(ProposeTransactionKqpDataExecute, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ProposeTransactionSendResult, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ReadRequest, \
+ GROUPS("DataShard", "LWTrackStart"), \
+ TYPES(ui64), \
+ NAMES("readId")) \
+ PROBE(ReadWaitMediatorState, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ReadWaitSnapshot, \
+ GROUPS("DataShard"), \
+ TYPES(ui64, ui64), \
+ NAMES("snapshotStep", "snapshotTxId")) \
+ PROBE(ReadExecute, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ReadSendResult, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ReadAck, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(ReadCancel, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+// DATASHARD_PROVIDER
+
+LWTRACE_DECLARE_PROVIDER(DATASHARD_PROVIDER)
+
+namespace NKikimr::NDataShard {
+
+ void RegisterDataShardProbes();
+
+}
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index c2fae3e0cd..eb87c540c2 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -64,10 +64,11 @@ struct TReadIteratorState {
};
public:
- TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts)
+ TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts, NLWTrace::TOrbit&& orbit = {})
: IsHeadRead(isHeadRead)
, SessionId(sessionId)
, StartTs(ts)
+ , Orbit(std::move(orbit))
{}
bool IsExhausted() const { return State == EState::Exhausted; }
@@ -198,6 +199,9 @@ public:
ui64 LastAckSeqNo = 0;
ui32 FirstUnprocessedQuery = 0;
TString LastProcessedKey = 0;
+
+ // Orbit used for tracking progress
+ NLWTrace::TOrbit Orbit;
};
using TReadIteratorStatePtr = std::unique_ptr<TReadIteratorState>;