diff options
author | senya0x5f <senya@ydb.tech> | 2023-12-04 15:50:50 +0300 |
---|---|---|
committer | senya0x5f <senya@ydb.tech> | 2023-12-05 02:42:56 +0300 |
commit | 1ff2a79206cf2fd76d1725a55d800a8366d2ba46 (patch) | |
tree | 72b3e5d728846347ec72d5ac54cb2f19f207838b | |
parent | e3a62fefd2e31f8525a2cdc97d5bc71b13342b2c (diff) | |
download | ydb-1ff2a79206cf2fd76d1725a55d800a8366d2ba46.tar.gz |
KIKIMR-20324 Wilson Trace for puts in base tablet
-rw-r--r-- | ydb/core/tablet/tablet_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet/tablet_req_writelog.cpp | 78 | ||||
-rw-r--r-- | ydb/core/tablet/tablet_sys.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_exec_commit.h | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_exec_commit_mgr.h | 6 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_exec_seat.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_exec_seat.h | 23 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.cpp | 31 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_txloglogic.cpp | 23 | ||||
-rw-r--r-- | ydb/library/wilson_ids/wilson.h | 6 |
10 files changed, 154 insertions, 32 deletions
diff --git a/ydb/core/tablet/tablet_impl.h b/ydb/core/tablet/tablet_impl.h index 77e0bcaf04..f43a21b731 100644 --- a/ydb/core/tablet/tablet_impl.h +++ b/ydb/core/tablet/tablet_impl.h @@ -15,7 +15,7 @@ namespace NKikimr { IActor* CreateTabletReqRebuildHistoryGraph(const TActorId &owner, TTabletStorageInfo *info, ui32 blockedGen, NTracing::ITrace *trace, ui64 followerCookie); IActor* CreateTabletFindLastEntry(const TActorId &owner, bool readBody, TTabletStorageInfo *info, ui32 blockedGen, bool leader); -IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info); +IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, NWilson::TTraceId traceId = {}); IActor* CreateTabletReqBlockBlobStorage(const TActorId &owner, TTabletStorageInfo *info, ui32 generation, bool blockPrevEntry); IActor* CreateTabletReqDelete(const TActorId &owner, const TIntrusivePtr<TTabletStorageInfo> &tabletStorageInfo, ui32 generation = std::numeric_limits<ui32>::max()); diff --git a/ydb/core/tablet/tablet_req_writelog.cpp b/ydb/core/tablet/tablet_req_writelog.cpp index b3a065180d..864755528b 100644 --- a/ydb/core/tablet/tablet_req_writelog.cpp +++ b/ydb/core/tablet/tablet_req_writelog.cpp @@ -1,8 +1,10 @@ #include "tablet_impl.h" #include <ydb/core/base/blobstorage.h> +#include <ydb/core/tablet/tablet_metrics.h> #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/hfunc.h> -#include <ydb/core/tablet/tablet_metrics.h> +#include <ydb/library/actors/wilson/wilson_span.h> +#include <ydb/library/wilson_ids/wilson.h> #include <util/random/random.h> @@ -26,6 +28,9 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> { TVector<ui32> YellowMoveChannels; TVector<ui32> YellowStopChannels; + NWilson::TSpan RequestSpan; + TMap<TLogoBlobID, NWilson::TSpan> BlobSpans; + void Handle(TEvents::TEvUndelivered::TPtr&, const TActorContext &ctx) { return ReplyAndDie(NKikimrProto::ERROR, "BlobStorage proxy unavailable", ctx); } @@ -51,21 +56,51 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> { if (--RepliesToWait == 0) { if (Y_UNLIKELY(RequestCookies != ResponseCookies)) { - return ReplyAndDie(NKikimrProto::ERROR, "TEvPut and TEvPutResult cookies don't match", ctx); + TString err = "TEvPut and TEvPutResult cookies don't match"; + EndInnerSpanError(msg->Id, err); + return ReplyAndDie(NKikimrProto::ERROR, err, ctx); } + EndInnerSpanOk(msg->Id); return ReplyAndDie(NKikimrProto::OK, { }, ctx); } + EndInnerSpanOk(msg->Id); return; case NKikimrProto::RACE: // TODO: must be handled with retry case NKikimrProto::BLOCKED: + EndInnerSpanError(msg->Id, msg->ErrorReason); return ReplyAndDie(NKikimrProto::BLOCKED, msg->ErrorReason, ctx); default: + EndInnerSpanError(msg->Id, msg->ErrorReason); return ReplyAndDie(NKikimrProto::ERROR, msg->ErrorReason, ctx); } } + void EndInnerSpanOk(const TLogoBlobID& blobId) { + if (Y_UNLIKELY(RequestSpan)) { + auto span = BlobSpans.extract(blobId); + if (!span.empty()) { + span.mapped().EndOk(); + } + } + } + + void EndInnerSpanError(const TLogoBlobID& blobId, const TString& errorReason) { + if (Y_UNLIKELY(RequestSpan)) { + auto span = BlobSpans.extract(blobId); + if (!span.empty()) { + span.mapped().EndError(errorReason); + } + + for (auto& other : BlobSpans) { + other.second.EndError("Another request failed"); + } + + BlobSpans.clear(); + } + } + void ReplyAndDie(NKikimrProto::EReplyStatus status, const TString &reason, const TActorContext &ctx) { if (YellowMoveChannels) { SortUnique(YellowMoveChannels); @@ -74,6 +109,12 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> { SortUnique(YellowStopChannels); } + if (status == NKikimrProto::OK) { + RequestSpan.EndOk(); + } else { + RequestSpan.EndError(reason); + } + ctx.Send(Owner, new TEvTabletBase::TEvWriteLogResult( status, LogEntryID, @@ -87,7 +128,7 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> { void SendToBS(const TLogoBlobID &id, const TString &buffer, const TActorContext &ctx, const NKikimrBlobStorage::EPutHandleClass handleClass, - TEvBlobStorage::TEvPut::ETactic tactic) { + TEvBlobStorage::TEvPut::ETactic tactic, NWilson::TTraceId traceId) { Y_ABORT_UNLESS(id.TabletID() == Info->TabletID); const TTabletChannelInfo *channelInfo = Info->ChannelInfo(id.Channel()); Y_ABORT_UNLESS(channelInfo); @@ -97,7 +138,7 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> { ui64 cookie = RandomNumber<ui64>(); RequestCookies ^= cookie; - SendPutToGroup(ctx, x->GroupID, Info.Get(), MakeHolder<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), handleClass, tactic), cookie); + SendPutToGroup(ctx, x->GroupID, Info.Get(), MakeHolder<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), handleClass, tactic), cookie, std::move(traceId)); } public: @@ -105,13 +146,15 @@ public: return NKikimrServices::TActivity::TABLET_REQ_WRITE_LOG; } - TTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &logid, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info) + TTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &logid, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, + TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, NWilson::TTraceId traceId) : Owner(owner) , LogEntryID(logid) , LogEntry(entry) , CommitTactic(commitTactic) , Info(info) , RepliesToWait(Max<ui32>()) + , RequestSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.WriteLog") { References.swap(refs); Y_ABORT_UNLESS(Info); @@ -125,7 +168,15 @@ public: const auto handleClass = NKikimrBlobStorage::TabletLog; for (const auto &ref : References) { - SendToBS(ref.Id, ref.Buffer, ctx, handleClass, ref.Tactic ? *ref.Tactic : CommitTactic); + NWilson::TTraceId innerTraceId; + + if (RequestSpan) { + auto res = BlobSpans.try_emplace(ref.Id, TWilsonTablet::Tablet, RequestSpan.GetTraceId(), "Tablet.WriteLog.Reference"); + + innerTraceId = std::move(res.first->second.GetTraceId()); + } + + SendToBS(ref.Id, ref.Buffer, ctx, handleClass, ref.Tactic ? *ref.Tactic : CommitTactic, std::move(innerTraceId)); } const TLogoBlobID actualLogEntryId = TLogoBlobID( @@ -136,7 +187,16 @@ public: logEntryBuffer.size(), LogEntryID.Cookie() ); - SendToBS(actualLogEntryId, logEntryBuffer, ctx, NKikimrBlobStorage::TabletLog, CommitTactic); + + NWilson::TTraceId traceId; + + if (RequestSpan) { + auto res = BlobSpans.try_emplace(actualLogEntryId, TWilsonTablet::Tablet, RequestSpan.GetTraceId(), "Tablet.WriteLog.LogEntry"); + + traceId = std::move(res.first->second.GetTraceId()); + } + + SendToBS(actualLogEntryId, logEntryBuffer, ctx, NKikimrBlobStorage::TabletLog, CommitTactic, std::move(traceId)); RepliesToWait = References.size() + 1; Become(&TThis::StateWait); @@ -150,8 +210,8 @@ public: } }; -IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info) { - return new TTabletReqWriteLog(owner, entryId, entry, refs, commitTactic, info); +IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, NWilson::TTraceId traceId) { + return new TTabletReqWriteLog(owner, entryId, entry, refs, commitTactic, info, std::move(traceId)); } } diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp index ba48a94d8c..a2c0dbd2ee 100644 --- a/ydb/core/tablet/tablet_sys.cpp +++ b/ydb/core/tablet/tablet_sys.cpp @@ -1199,7 +1199,7 @@ bool TTablet::HandleNext(TEvTablet::TEvCommit::TPtr &ev) { entry->StateStorageConfirmed = true; // todo: do real query against state-storage (optionally?) entry->Task = Register( - CreateTabletReqWriteLog(SelfId(), logid, x.release(), msg->References, msg->CommitTactic, Info.Get()) + CreateTabletReqWriteLog(SelfId(), logid, x.release(), msg->References, msg->CommitTactic, Info.Get(), std::move(ev->TraceId)) ); Graph.StepsInFlight += 1; diff --git a/ydb/core/tablet_flat/flat_exec_commit.h b/ydb/core/tablet_flat/flat_exec_commit.h index 058d240912..92f8cc1fbd 100644 --- a/ydb/core/tablet_flat/flat_exec_commit.h +++ b/ydb/core/tablet_flat/flat_exec_commit.h @@ -27,10 +27,11 @@ namespace NTabletFlatExecutor { struct TLogCommit { using ETactic = TEvBlobStorage::TEvPut::ETactic; - TLogCommit(bool sync, ui32 step, ECommit type) + TLogCommit(bool sync, ui32 step, ECommit type, NWilson::TTraceId traceId) : Step(step) , Type(type) , Sync(sync) + , TraceId(std::move(traceId)) { } @@ -57,6 +58,7 @@ namespace NTabletFlatExecutor { TVector<TEvTablet::TLogEntryReference> Refs; TGCBlobDelta GcDelta; TVector<TEvTablet::TCommitMetadata> Metadata; + NWilson::TTraceId TraceId; TSeat *FirstTx = nullptr; TSeat *LastTx = nullptr; }; diff --git a/ydb/core/tablet_flat/flat_exec_commit_mgr.h b/ydb/core/tablet_flat/flat_exec_commit_mgr.h index b8801e891a..269c7e6e36 100644 --- a/ydb/core/tablet_flat/flat_exec_commit_mgr.h +++ b/ydb/core/tablet_flat/flat_exec_commit_mgr.h @@ -94,7 +94,7 @@ namespace NTabletFlatExecutor { return NTable::TTxStamp{ Gen, Head }; } - TAutoPtr<TLogCommit> Begin(bool sync, ECommit type) noexcept + TAutoPtr<TLogCommit> Begin(bool sync, ECommit type, NWilson::TTraceId traceId) noexcept { const auto step = Head; @@ -108,7 +108,7 @@ namespace NTabletFlatExecutor { Switch(Head += 1); /* detached commits moves head now */ } - return new TLogCommit(sync, step, type); + return new TLogCommit(sync, step, type, std::move(traceId)); } void Commit(TAutoPtr<TLogCommit> commit) noexcept @@ -177,7 +177,7 @@ namespace NTabletFlatExecutor { ev->GcLeft = std::move(commit.GcDelta.Deleted); ev->EmbeddedMetadata = std::move(commit.Metadata); - Ops->Send(Owner, ev, 0, ui64(commit.Type)); + Ops->Send(Owner, ev, 0, ui64(commit.Type), std::move(commit.TraceId)); } public: diff --git a/ydb/core/tablet_flat/flat_exec_seat.cpp b/ydb/core/tablet_flat/flat_exec_seat.cpp index e4f4e68322..1af617a924 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.cpp +++ b/ydb/core/tablet_flat/flat_exec_seat.cpp @@ -3,11 +3,20 @@ namespace NKikimr { namespace NTabletFlatExecutor { - void TSeat::Complete(const TActorContext& ctx) noexcept { + void TSeat::Complete(const TActorContext& ctx, bool isRW) noexcept { for (auto& callback : OnPersistent) { callback(); } Self->Complete(ctx); + + TxSpan.Attribute("rw", isRW); + TxSpan.EndOk(); + } + + void TSeat::Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept { + Self->Terminate(reason, ctx); + + TxSpan.EndError("Terminated"); } } // namespace NTabletFlatExecutor diff --git a/ydb/core/tablet_flat/flat_exec_seat.h b/ydb/core/tablet_flat/flat_exec_seat.h index 2b3cd3a9fd..cc1f37151d 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.h +++ b/ydb/core/tablet_flat/flat_exec_seat.h @@ -6,6 +6,8 @@ #include <util/generic/ptr.h> #include <util/system/hp_timer.h> +#include <ydb/library/actors/wilson/wilson_span.h> +#include <ydb/library/wilson_ids/wilson.h> namespace NKikimr { namespace NTabletFlatExecutor { @@ -15,9 +17,10 @@ namespace NTabletFlatExecutor { TSeat(const TSeat&) = delete; - TSeat(ui32 uniqId, TAutoPtr<ITransaction> self) + TSeat(ui32 uniqId, TAutoPtr<ITransaction> self, NWilson::TTraceId txTraceId) : UniqID(uniqId) , Self(self) + , TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction")) { } @@ -29,10 +32,26 @@ namespace NTabletFlatExecutor { out << "}"; } - void Complete(const TActorContext& ctx) noexcept; + void Complete(const TActorContext& ctx, bool isRW) noexcept; + + void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept; + + void CreateEnqueuedSpan() noexcept { + EnqueuedSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued"); + } + + void FinishEnqueuedSpan() noexcept { + EnqueuedSpan.EndOk(); + } + + NWilson::TTraceId GetTxTraceId() const noexcept { + return TxSpan.GetTraceId(); + } const ui64 UniqID = Max<ui64>(); const TAutoPtr<ITransaction> Self; + NWilson::TSpan TxSpan; + NWilson::TSpan EnqueuedSpan; ui64 Retries = 0; TPinned Pinned; diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index d64bae8270..5493dead54 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -33,6 +33,8 @@ #include <ydb/core/control/immediate_control_board_impl.h> #include <ydb/core/scheme/scheme_type_registry.h> #include <ydb/core/tablet/tablet_counters_aggregator.h> +#include <ydb/library/actors/wilson/wilson_span.h> +#include <ydb/library/wilson_ids/wilson.h> #include <ydb/library/yverify_stream/yverify_stream.h> #include <library/cpp/monlib/service/pages/templates.h> @@ -193,6 +195,7 @@ void TExecutor::RecreatePageCollectionsCache() noexcept for (auto &xpair : TransactionWaitPads) { auto &seat = xpair.second->Seat; LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); + seat->CreateEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; } @@ -516,6 +519,7 @@ void TExecutor::PlanTransactionActivation() { while (PendingQueue->Head() && (!limitTxInFly || (Stats->TxInFly - Stats->TxPending < limitTxInFly))) { TAutoPtr<TSeat> seat = PendingQueue->Pop(); LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); + seat->CreateEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; --Stats->TxPending; @@ -535,6 +539,7 @@ void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueue if (auto it = TransactionWaitPads.find(waitPad); it != TransactionWaitPads.end()) { auto &seat = it->second->Seat; LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); + seat->CreateEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; TransactionWaitPads.erase(waitPad); @@ -1522,7 +1527,7 @@ TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) { } else if (!LeaseDropped) { LogicRedo->FlushBatchedLog(); - auto commit = CommitManager->Begin(true, ECommit::Misc); + auto commit = CommitManager->Begin(true, ECommit::Misc, {}); lease = AttachLeaseCommit(commit.Get(), /* force */ true); @@ -1572,7 +1577,9 @@ bool TExecutor::CanExecuteTransaction() const { void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx) { Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation"); - TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self); + NWilson::TTraceId traceId; + + TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self, std::move(traceId)); LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self)); @@ -1616,6 +1623,7 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) { LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); + seat->CreateEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; PlanTransactionActivation(); @@ -1648,9 +1656,15 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct txc.NotEnoughMemory(seat->NotEnoughMemoryCount); Database->Begin(Stamp(), env); + LWTRACK(TransactionExecuteBegin, seat->Self->Orbit, seat->UniqID); + + NWilson::TSpan txExecuteSpan(TWilsonTablet::Tablet, seat->GetTxTraceId(), "Tablet.Transaction.Execute"); const bool done = seat->Self->Execute(txc, ctx.MakeFor(OwnerActorId)); + txExecuteSpan.EndOk(); + LWTRACK(TransactionExecuteEnd, seat->Self->Orbit, seat->UniqID, done); + seat->CPUExecTime += cpuTimer.PassedReset(); if (done) { @@ -1842,6 +1856,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv & // then tx may be re-activated. if (!PrivatePageCache->GetStats().CurrentCacheMisses) { LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); + seat->CreateEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; PlanTransactionActivation(); @@ -1925,7 +1940,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv if (Stats->IsFollower) { --Stats->TxInFly; Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly; - seat->Self->Terminate(seat->TerminationReason, OwnerCtx()); + seat->Terminate(seat->TerminationReason, OwnerCtx()); } else if (LogicRedo->TerminateTransaction(seat, ctx, OwnerActorId)) { --Stats->TxInFly; Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly; @@ -2423,7 +2438,7 @@ void TExecutor::MakeLogSnapshot() { LogicRedo->FlushBatchedLog(); - auto commit = CommitManager->Begin(true, ECommit::Snap); + auto commit = CommitManager->Begin(true, ECommit::Snap, {}); NKikimrExecutorFlat::TLogSnapshot snap; @@ -2537,6 +2552,7 @@ void TExecutor::Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorC if (TAutoPtr<TSeat> seat = ActivationQueue->Pop()) { Y_ABORT_UNLESS(ActivateTransactionWaiting > 0); ActivateTransactionWaiting--; + seat->FinishEnqueuedSpan(); ExecuteTransaction(seat, ctx); } else { // N.B. it should actually never happen, since ActivationQueue size @@ -2928,6 +2944,7 @@ void TExecutor::StartSeat(ui64 task, TResource *cookie_) noexcept PostponedTransactions.erase(it); Memory->AcquiredMemory(*seat, task); LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); + seat->CreateEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; PlanTransactionActivation(); @@ -2937,7 +2954,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable:: { LogicRedo->FlushBatchedLog(); - auto commit = CommitManager->Begin(true, ECommit::Misc); + auto commit = CommitManager->Begin(true, ECommit::Misc, {}); if (params && params->Edge.Head == NTable::TEpoch::Max()) { auto redo = Database->SnapshotToLog(table, { Generation(), commit->Step }); @@ -3230,7 +3247,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled) NKikimrExecutorFlat::TFollowerPartSwitchAux aux; - auto commit = CommitManager->Begin(true, ECommit::Data); + auto commit = CommitManager->Begin(true, ECommit::Data, {}); commit->WaitFollowerGcAck = true; @@ -4564,7 +4581,7 @@ void TExecutor::CommitCompactionChanges( LogicRedo->FlushBatchedLog(); - auto commit = CommitManager->Begin(true, ECommit::Misc); + auto commit = CommitManager->Begin(true, ECommit::Misc, {}); NKikimrExecutorFlat::TTablePartSwitch proto; proto.SetTableId(tableId); diff --git a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp index 308d2a22cb..d3aa1160d5 100644 --- a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp +++ b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp @@ -60,7 +60,7 @@ bool TLogicRedo::TerminateTransaction(TAutoPtr<TSeat> seat, const TActorContext if (CompletionQueue.empty()) { const TTxType txType = seat->Self->GetTxType(); - seat->Self->Terminate(seat->TerminationReason, ctx.MakeFor(ownerID)); + seat->Terminate(seat->TerminationReason, ctx.MakeFor(ownerID)); Counters->Cumulative()[TExecutorCounters::TX_TERMINATED].Increment(1); if (AppTxCounters && txType != UnknownTxType) AppTxCounters->TxCumulative(txType, COUNTER_TT_TERMINATED).Increment(1); @@ -79,7 +79,7 @@ void CompleteRoTransaction(TAutoPtr<TSeat> seat, const TActorContext &ownerCtx, THPTimer completeTimer; LWTRACK(TransactionCompleteBegin, seat->Self->Orbit, seat->UniqID); - seat->Complete(ownerCtx); + seat->Complete(ownerCtx, false); LWTRACK(TransactionCompleteEnd, seat->Self->Orbit, seat->UniqID); const ui64 completeTimeus = ui64(1000000. * completeTimer.Passed()); @@ -145,7 +145,7 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction( if (force || MaxItemsToBatch < 2 || change.Redo.size() > MaxBytesToBatch) { FlushBatchedLog(); - auto commit = CommitManager->Begin(true, ECommit::Redo); + auto commit = CommitManager->Begin(true, ECommit::Redo, seat->GetTxTraceId()); commit->PushTx(seat.Get()); CompletionQueue.push_back({ seat, commit->Step }); @@ -181,10 +181,19 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction( if (Batch->Bytes + change.Redo.size() > MaxBytesToBatch) FlushBatchedLog(); - if (!Batch->Commit) - Batch->Commit = CommitManager->Begin(false, ECommit::Redo); + if (!Batch->Commit) { + Batch->Commit = CommitManager->Begin(false, ECommit::Redo, seat->GetTxTraceId()); + } else { + i64 batchSize = Batch->Bodies.size() + 1; + + Batch->Commit->FirstTx->TxSpan.Attribute("BatchSize", batchSize); + seat->TxSpan.Attribute("Batched", true); + seat->TxSpan.Link(Batch->Commit->FirstTx->GetTxTraceId(), {}); + } + Batch->Commit->PushTx(seat.Get()); + CompletionQueue.push_back({ seat, Batch->Commit->Step }); Batch->Add(std::move(change.Redo), change.Affects); @@ -251,7 +260,7 @@ ui64 TLogicRedo::Confirm(ui32 step, const TActorContext &ctx, const TActorId &ow ++confirmedTransactions; THPTimer completeTimer; LWTRACK(TransactionCompleteBegin, seat->Self->Orbit, seat->UniqID); - entry.InFlyRWTransaction->Complete(ownerCtx); + entry.InFlyRWTransaction->Complete(ownerCtx, true); LWTRACK(TransactionCompleteEnd, seat->Self->Orbit, seat->UniqID); const ui64 completeTimeus = ui64(1000000. * completeTimer.Passed()); @@ -271,7 +280,7 @@ ui64 TLogicRedo::Confirm(ui32 step, const TActorContext &ctx, const TActorId &ow for (auto &x : entry.WaitingTerminatedTransactions) { const TTxType roTxType = x->Self->GetTxType(); - x->Self->Terminate(x->TerminationReason, ownerCtx); + x->Terminate(x->TerminationReason, ownerCtx); Counters->Cumulative()[TExecutorCounters::TX_TERMINATED].Increment(1); if (AppTxCounters && roTxType != UnknownTxType) diff --git a/ydb/library/wilson_ids/wilson.h b/ydb/library/wilson_ids/wilson.h index 9c6c9be331..dec3bd13ee 100644 --- a/ydb/library/wilson_ids/wilson.h +++ b/ydb/library/wilson_ids/wilson.h @@ -47,4 +47,10 @@ namespace NKikimr { }; }; + struct TWilsonTablet { + enum { + Tablet = 15 + }; + }; + } // NKikimr |