diff options
author | snaury <snaury@ydb.tech> | 2023-05-16 17:32:22 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-05-16 17:32:22 +0300 |
commit | a4abb45e22dc77aceeb14548f2f1fc820131fd51 (patch) | |
tree | 21538b48de8ca804985a8eff520acf341b0b3cf3 | |
parent | b0c5c30ee3bc0eb2c3c2c09f78add41af5cc0088 (diff) | |
download | ydb-a4abb45e22dc77aceeb14548f2f1fc820131fd51.tar.gz |
Add some lwtrace probes to datashard
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/complete_data_tx_unit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__propose_tx_base.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/probes.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/probes.h | 78 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 6 |
16 files changed, 199 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index 89b7f824ac..6f4d1978e5 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -112,6 +112,8 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote @@ -280,6 +282,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp @@ -371,6 +374,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 8ec494d2d8..3e69f6c4fa 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -113,6 +113,8 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote @@ -281,6 +283,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp @@ -373,6 +376,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 8ec494d2d8..3e69f6c4fa 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -113,6 +113,8 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote @@ -281,6 +283,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp @@ -373,6 +376,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index 48489bc23d..e819352bec 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -113,6 +113,8 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote @@ -281,6 +283,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_distributed_erase_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/prepare_scheme_tx_in_rs_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/protect_scheme_echoes_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp @@ -368,6 +371,8 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + library-cpp-lwtrace + cpp-lwtrace-mon monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote diff --git a/ydb/core/tx/datashard/complete_data_tx_unit.cpp b/ydb/core/tx/datashard/complete_data_tx_unit.cpp index 60bb976459..db8597b55d 100644 --- a/ydb/core/tx/datashard/complete_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/complete_data_tx_unit.cpp @@ -2,9 +2,12 @@ #include "datashard_impl.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" +#include "probes.h" #include <ydb/core/engine/minikql/minikql_engine_host.h> +LWTRACE_USING(DATASHARD_PROVIDER) + namespace NKikimr { namespace NDataShard { @@ -96,8 +99,10 @@ void TCompleteOperationUnit::CompleteOperation(TOperation::TPtr op, DataShard.FillExecutionStats(op->GetExecutionProfile(), *result); - if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) + if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) { + result->Orbit = std::move(op->Orbit); DataShard.SendResult(ctx, result, op->GetTarget(), op->GetStep(), op->GetTxId()); + } } Pipeline.RemoveCompletingOp(op); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index ab36bdc66d..84c1bb336c 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1,5 +1,6 @@ #include "datashard_impl.h" #include "datashard_txs.h" +#include "probes.h" #include <ydb/core/base/interconnect_channels.h> #include <ydb/core/engine/minikql/flat_local_tx_factory.h> @@ -12,6 +13,8 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> +LWTRACE_USING(DATASHARD_PROVIDER) + namespace NKikimr { IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info) { @@ -162,6 +165,8 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) ETxTypes_descriptor >()); TabletCounters = TabletCountersPtr.Get(); + + RegisterDataShardProbes(); } NTabletPipe::TClientConfig TDataShard::GetPipeClientConfig() { @@ -563,6 +568,7 @@ public: ui64 resultSize = Result->GetTxResult().size(); ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0); + LWTRACK(ProposeTransactionSendResult, Result->Orbit); Self->Send(Target, Result.Release(), flags); } @@ -605,6 +611,7 @@ void TDataShard::SendResult(const TActorContext &ctx, ui64 resultSize = res->GetTxResult().size(); ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0); + LWTRACK(ProposeTransactionSendResult, res->Orbit); ctx.Send(target, res.Release(), flags); } @@ -2483,6 +2490,7 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction* bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReason); if (reject) { + LWTRACK(ProposeTransactionReject, msg->Orbit); THolder<TEvDataShard::TEvProposeTransactionResult> result = THolder(new TEvDataShard::TEvProposeTransactionResult(msg->GetTxKind(), TabletID(), @@ -2507,12 +2515,16 @@ void TDataShard::UpdateProposeQueueSize() const { } void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) { + auto* msg = ev->Get(); + LWTRACK(ProposeTransactionRequest, msg->Orbit); + // Check if we need to delay an immediate transaction if (MediatorStateWaiting && (ev->Get()->GetFlags() & TTxFlags::Immediate) && !(ev->Get()->GetFlags() & TTxFlags::ForceOnline)) { // We cannot calculate correct version until we restore mediator state + LWTRACK(ProposeTransactionWaitMediatorState, msg->Orbit); MediatorStateWaitingMsgs.emplace_back(ev.Release()); UpdateProposeQueueSize(); return; @@ -2521,6 +2533,7 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc if (Pipeline.HasProposeDelayers()) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored"); + LWTRACK(ProposeTransactionWaitDelayers, msg->Orbit); DelayedProposeQueue.emplace_back().Reset(ev.Release()); UpdateProposeQueueSize(); return; @@ -2589,6 +2602,9 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransactionAttach::TPtr &ev, con } void TDataShard::HandleAsFollower(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) { + auto* msg = ev->Get(); + LWTRACK(ProposeTransactionRequest, msg->Orbit); + IncCounter(COUNTER_PREPARE_REQUEST); if (TxInFly() > GetMaxTxInFly()) { @@ -2629,11 +2645,12 @@ void TDataShard::CheckDelayedProposeQueue(const TActorContext &ctx) { } void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx) { + auto* msg = ev->Get(); bool mayRunImmediate = false; - if ((ev->Get()->GetFlags() & TTxFlags::Immediate) && - !(ev->Get()->GetFlags() & TTxFlags::ForceOnline) && - ev->Get()->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) + if ((msg->GetFlags() & TTxFlags::Immediate) && + !(msg->GetFlags() & TTxFlags::ForceOnline) && + msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) { // This transaction may run in immediate mode mayRunImmediate = true; @@ -2641,6 +2658,7 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr && if (mayRunImmediate) { // Enqueue immediate transactions so they don't starve existing operations + LWTRACK(ProposeTransactionEnqueue, msg->Orbit); ProposeQueue.Enqueue(std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx); UpdateProposeQueueSize(); } else { @@ -3051,6 +3069,8 @@ bool TDataShard::WaitPlanStep(ui64 step) { } bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const { + auto* msg = ev->Get(); + if (MvccSwitchState == TSwitchState::SWITCHING) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction needs to wait because of mvcc state switching"); return true; @@ -3062,6 +3082,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads()); if (rowVersion >= unreadableEdge) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge); + LWTRACK(ProposeTransactionWaitSnapshot, msg->Orbit, rowVersion.Step, rowVersion.TxId); return true; } } diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 2e16d04d00..2c3b33190d 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -13,6 +13,7 @@ #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/tablet_flat/flat_row_versions.h> +#include <library/cpp/lwtrace/shuttle.h> #include <library/cpp/time_provider/time_provider.h> namespace arrow { @@ -489,6 +490,9 @@ struct TEvDataShard { TStringBuf GetTxBody() const { return Record.GetTxBody(); } + + // Orbit used for tracking request events + NLWTrace::TOrbit Orbit; }; struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrTxDataShard::TEvCancelTransactionProposal, TEvDataShard::EvCancelTransactionProposal> { @@ -661,6 +665,10 @@ struct TEvDataShard { private: bool ForceOnline = false; bool ForceDirty = false; + + public: + // Orbit used for tracking request events + NLWTrace::TOrbit Orbit; }; struct TEvProposeTransactionRestart : public TEventPB<TEvProposeTransactionRestart, NKikimrTxDataShard::TEvProposeTransactionRestart, TEvDataShard::EvProposeTransactionRestart> { @@ -911,6 +919,9 @@ struct TEvDataShard { // True when TEvRead is cancelled while enqueued in a waiting queue bool Cancelled = false; + + // Orbit used for tracking request events + NLWTrace::TOrbit Orbit; }; struct TEvReadResult : public TEventPB<TEvReadResult, diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp index 650556bdd2..9f62e9246b 100644 --- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp +++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp @@ -1,10 +1,13 @@ #include "datashard_txs.h" #include "datashard_failpoints.h" #include "operation.h" +#include "probes.h" #include <ydb/core/util/pb.h> #include <ydb/core/base/wilson.h> +LWTRACE_USING(DATASHARD_PROVIDER) + namespace NKikimr { namespace NDataShard { @@ -89,6 +92,7 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa // Unsuccessful operation parse. if (op->IsAborted()) { + LWTRACK(ProposeTransactionParsed, op->Orbit, false); Y_VERIFY(op->Result()); if (ProposeTransactionSpan) { @@ -98,6 +102,8 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa return true; } + LWTRACK(ProposeTransactionParsed, op->Orbit, true); + op->BuildExecutionPlan(false); if (!op->IsExecutionPlanFinished()) Self->Pipeline.GetExecutionUnit(op->GetCurrentUnit()).AddOperation(op); diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index f917121b6c..c28049b556 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -3,6 +3,7 @@ #include "datashard_read_operation.h" #include "setup_sys_locks.h" #include "datashard_locks_db.h" +#include "probes.h" #include <ydb/core/formats/arrow/arrow_batch_builder.h> @@ -10,6 +11,8 @@ #include <utility> +LWTRACE_USING(DATASHARD_PROVIDER) + namespace NKikimr::NDataShard { using namespace NTabletFlatExecutor; @@ -1077,6 +1080,7 @@ public: } } + LWTRACK(ReadExecute, state.Orbit); if (!Read(txc, ctx, state)) return EExecutionStatus::Restart; @@ -1407,6 +1411,7 @@ public: Reader->FillResult(*Result, state); if (!gSkipReadIteratorResultFailPoint.Check(Self->TabletID())) { + LWTRACK(ReadSendResult, state.Orbit); Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); } } @@ -1953,6 +1958,7 @@ public: AppData()->MonotonicTimeProvider->Now(), Self)); + LWTRACK(ReadExecute, state.Orbit); if (Reader->Read(txc, ctx)) { // Retry later when dependencies are resolved if (!Reader->GetVolatileReadDependencies().empty()) { @@ -2082,6 +2088,7 @@ public: << ", firstUnprocessed# " << state.FirstUnprocessedQuery); Reader->FillResult(*Result, state); + LWTRACK(ReadSendResult, state.Orbit); Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); if (Reader->HasUnreadQueries()) { @@ -2116,6 +2123,8 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } + LWTRACK(ReadRequest, request->Orbit, record.GetReadId()); + TReadIteratorId readId(ev->Sender, record.GetReadId()); if (!Pipeline.HandleWaitingReadIterator(readId, request)) { // This request has been cancelled @@ -2147,6 +2156,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } if (MediatorStateWaiting) { + LWTRACK(ReadWaitMediatorState, request->Orbit); Pipeline.RegisterWaitingReadIterator(readId, request); MediatorStateWaitingMsgs.emplace_back(ev.Release()); UpdateProposeQueueSize(); @@ -2240,6 +2250,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct auto prioritizedMvccSnapshotReads = GetEnablePrioritizedMvccSnapshotReads(); TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads); if (readVersion >= unreadableEdge) { + LWTRACK(ReadWaitSnapshot, request->Orbit, readVersion.Step, readVersion.TxId); Pipeline.AddWaitingReadIterator(readVersion, std::move(ev), ctx); return; } @@ -2328,7 +2339,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct ReadIterators.emplace( readId, - new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now())); + new TReadIteratorState(sessionId, isHeadRead, AppData()->MonotonicTimeProvider->Now(), std::move(request->Orbit))); SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); @@ -2387,6 +2398,8 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& return; } + LWTRACK(ReadAck, state.Orbit); + // We received ACK on message we hadn't sent yet if (state.SeqNo < record.GetSeqNo()) { auto issueStr = TStringBuilder() << TabletID() << " ReadAck from future: " << record.GetSeqNo() @@ -2450,6 +2463,8 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte IncCounter(COUNTER_READ_ITERATOR_CANCEL); } + LWTRACK(ReadCancel, state->Orbit); + DeleteReadIterator(it); } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 7d8dd3a1d4..4fcb433049 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1197,6 +1197,7 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: tx->SetTarget(ev->Get()->GetSource()); tx->SetTxBody(rec.GetTxBody()); tx->SetCookie(ev->Cookie); + tx->Orbit = std::move(ev->Get()->Orbit); auto malformed = [&](const TStringBuf txType, const TString& txBody) { const TString error = TStringBuilder() << "Malformed " << txType << " tx" diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 5f111993c5..58d5898a23 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -4,10 +4,13 @@ #include "execution_unit_ctors.h" #include "setup_sys_locks.h" #include "datashard_locks_db.h" +#include "probes.h" #include <ydb/core/engine/minikql/minikql_engine_host.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> +LWTRACE_USING(DATASHARD_PROVIDER) + namespace NKikimr { namespace NDataShard { @@ -218,6 +221,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio dataTx->SetVolatileTxId(tx->GetTxId()); } + LWTRACK(ProposeTransactionKqpDataExecute, op->Orbit); + KqpCommitLocks(tabletId, tx, writeVersion, DataShard); auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index 812b0796dd..dafbf08248 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -2,6 +2,9 @@ #include "datashard_impl.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" +#include "probes.h" + +LWTRACE_USING(DATASHARD_PROVIDER) namespace NKikimr { namespace NDataShard { @@ -180,6 +183,7 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op, DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size()); if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) { + LWTRACK(ProposeTransactionSendResult, op->Orbit); if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) { DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie()); } else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) { diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 62c1efe721..183d25acaa 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -870,6 +870,10 @@ private: public: std::optional<TRowVersion> MvccReadWriteVersion; + +public: + // Orbit used for tracking operation progress + NLWTrace::TOrbit Orbit; }; inline IOutputStream &operator <<(IOutputStream &out, diff --git a/ydb/core/tx/datashard/probes.cpp b/ydb/core/tx/datashard/probes.cpp new file mode 100644 index 0000000000..0cbd49a1e4 --- /dev/null +++ b/ydb/core/tx/datashard/probes.cpp @@ -0,0 +1,19 @@ +#include "probes.h" + +#include <library/cpp/lwtrace/mon/mon_lwtrace.h> + +#include <mutex> + +LWTRACE_DEFINE_PROVIDER(DATASHARD_PROVIDER) + +namespace NKikimr::NDataShard { + + void RegisterDataShardProbes() { + static std::once_flag flag; + + std::call_once(flag, []{ + NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DATASHARD_PROVIDER)); + }); + } + +} diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h new file mode 100644 index 0000000000..26bf44e87d --- /dev/null +++ b/ydb/core/tx/datashard/probes.h @@ -0,0 +1,78 @@ +#pragma once + +#include <library/cpp/lwtrace/all.h> + +#define DATASHARD_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ + PROBE(ProposeTransactionRequest, \ + GROUPS("DataShard", "LWTrackStart"), \ + TYPES(), \ + NAMES()) \ + PROBE(ProposeTransactionWaitMediatorState, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ProposeTransactionWaitDelayers, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ProposeTransactionWaitSnapshot, \ + GROUPS("DataShard"), \ + TYPES(ui64, ui64), \ + NAMES("snapshotStep", "snapshotTxId")) \ + PROBE(ProposeTransactionReject, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ProposeTransactionEnqueue, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ProposeTransactionParsed, \ + GROUPS("DataShard"), \ + TYPES(bool), \ + NAMES("success")) \ + PROBE(ProposeTransactionKqpDataExecute, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ProposeTransactionSendResult, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ReadRequest, \ + GROUPS("DataShard", "LWTrackStart"), \ + TYPES(ui64), \ + NAMES("readId")) \ + PROBE(ReadWaitMediatorState, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ReadWaitSnapshot, \ + GROUPS("DataShard"), \ + TYPES(ui64, ui64), \ + NAMES("snapshotStep", "snapshotTxId")) \ + PROBE(ReadExecute, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ReadSendResult, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ReadAck, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(ReadCancel, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ +// DATASHARD_PROVIDER + +LWTRACE_DECLARE_PROVIDER(DATASHARD_PROVIDER) + +namespace NKikimr::NDataShard { + + void RegisterDataShardProbes(); + +} diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index c2fae3e0cd..eb87c540c2 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -64,10 +64,11 @@ struct TReadIteratorState { }; public: - TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts) + TReadIteratorState(const TActorId& sessionId, bool isHeadRead, TMonotonic ts, NLWTrace::TOrbit&& orbit = {}) : IsHeadRead(isHeadRead) , SessionId(sessionId) , StartTs(ts) + , Orbit(std::move(orbit)) {} bool IsExhausted() const { return State == EState::Exhausted; } @@ -198,6 +199,9 @@ public: ui64 LastAckSeqNo = 0; ui32 FirstUnprocessedQuery = 0; TString LastProcessedKey = 0; + + // Orbit used for tracking progress + NLWTrace::TOrbit Orbit; }; using TReadIteratorStatePtr = std::unique_ptr<TReadIteratorState>; |