diff options
author | snaury <snaury@ydb.tech> | 2023-05-18 11:30:23 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-05-18 11:30:23 +0300 |
commit | 46444f475a4f1f19ef04654624be6883ed89a130 (patch) | |
tree | 00e14b62ad8df3598b8a3b63b5767d00dd131c54 | |
parent | 2c69e50ada916875a09f14a9f7548faa91d5306b (diff) | |
download | ydb-46444f475a4f1f19ef04654624be6883ed89a130.tar.gz |
Pass orbit from kqp to long tx service and datashard
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_snapshot_manager.h | 17 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_unit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/probes.h | 4 |
6 files changed, 41 insertions, 15 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index afac59d7681..444536dcd35 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -344,6 +344,7 @@ private: void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) { TEvDataShard::TEvProposeTransactionResult* res = ev->Get(); + ResponseEv->Orbit.Join(res->Orbit); const ui64 shardId = res->GetOrigin(); TShardState* shardState = ShardStates.FindPtr(shardId); YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId); @@ -971,6 +972,7 @@ private: void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) { TEvDataShard::TEvProposeTransactionResult* res = ev->Get(); + ResponseEv->Orbit.Join(res->Orbit); const ui64 shardId = res->GetOrigin(); LastShard = shardId; @@ -1549,8 +1551,9 @@ private: const ui32 flags = (ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) | (VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0); + std::unique_ptr<TEvDataShard::TEvProposeTransaction> evData; if (GetSnapshot().IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) { - ev.reset(new TEvDataShard::TEvProposeTransaction( + evData.reset(new TEvDataShard::TEvProposeTransaction( NKikimrTxDataShard::TX_KIND_DATA, SelfId(), TxId, @@ -1559,13 +1562,15 @@ private: GetSnapshot().TxId, flags)); } else { - ev.reset(new TEvDataShard::TEvProposeTransaction( + evData.reset(new TEvDataShard::TEvProposeTransaction( NKikimrTxDataShard::TX_KIND_DATA, SelfId(), TxId, dataTransaction.SerializeAsString(), flags)); } + ResponseEv->Orbit.Fork(evData->Orbit); + ev = std::move(evData); } auto traceId = ExecuterSpan.GetTraceId(); diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp index e461d85ab90..744d482f5e1 100644 --- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp +++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp @@ -46,12 +46,13 @@ private: ClientActorId = ev->Sender; Tables = ev->Get()->Tables; MvccSnapshot = ev->Get()->MvccSnapshot; + Orbit = std::move(ev->Get()->Orbit); LOG_D("KqpSnapshotManager: got snapshot request from " << ClientActorId); if (MvccSnapshot) { auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId()); - Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database)); + Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database, std::move(Orbit))); Become(&TThis::StateAwaitAcquireResult); } else { @@ -91,6 +92,7 @@ private: void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr& ev) { Y_VERIFY(MvccSnapshot); Y_VERIFY(Tables.empty()); + Orbit = std::move(ev->Get()->Orbit); const auto& record = ev->Get()->Record; if (record.GetStatus() == Ydb::StatusIds::SUCCESS) { @@ -99,7 +101,7 @@ private: LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " acquired"); bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( - Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {})); + Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit))); Y_VERIFY_DEBUG(sent); PassAway(); @@ -126,7 +128,7 @@ private: LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " created"); bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( - Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {})); + Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit))); Y_VERIFY_DEBUG(sent); Become(&TThis::StateRefreshing); @@ -224,7 +226,7 @@ private: void ReplyErrorAndDie(NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues) { if (CurrentStateFunc() == &TThis::StateAwaitCreation || CurrentStateFunc() == &TThis::StateAwaitAcquireResult) { Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( - IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues))); + IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues), std::move(Orbit))); } else { SendDiscard(); } @@ -236,6 +238,7 @@ private: TVector<TString> Tables; TActorId ClientActorId; IKqpGateway::TKqpSnapshot Snapshot; + NLWTrace::TOrbit Orbit; bool MvccSnapshot = false; diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h index 3c63d9dce0a..995d16c0857 100644 --- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h +++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h @@ -13,30 +13,35 @@ struct TEvKqpSnapshot { struct TEvCreateSnapshotRequest : public TEventLocal<TEvCreateSnapshotRequest, TKqpSnapshotEvents::EvCreateSnapshotRequest> { - explicit TEvCreateSnapshotRequest(const TVector<TString>& tables) + explicit TEvCreateSnapshotRequest(const TVector<TString>& tables, NLWTrace::TOrbit&& orbit = {}) : Tables(tables) - , MvccSnapshot(false){} + , MvccSnapshot(false) + , Orbit(std::move(orbit)) {} - explicit TEvCreateSnapshotRequest() + explicit TEvCreateSnapshotRequest(NLWTrace::TOrbit&& orbit = {}) : Tables({}) - , MvccSnapshot(true){} + , MvccSnapshot(true) + , Orbit(std::move(orbit)) {} const TVector<TString> Tables; const bool MvccSnapshot; + NLWTrace::TOrbit Orbit; }; struct TEvCreateSnapshotResponse : public TEventLocal<TEvCreateSnapshotResponse, TKqpSnapshotEvents::EvCreateSnapshotResponse> { TEvCreateSnapshotResponse(const IKqpGateway::TKqpSnapshot& snapshot, - NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues) + NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues, NLWTrace::TOrbit&& orbit) : Snapshot(snapshot) , Status(status) - , Issues(std::move(issues)) {} + , Issues(std::move(issues)) + , Orbit(std::move(orbit)) {} const IKqpGateway::TKqpSnapshot Snapshot; const NKikimrIssues::TStatusIds::EStatusCode Status; const NYql::TIssues Issues; + NLWTrace::TOrbit Orbit; }; struct TEvDiscardSnapshot : public TEventLocal<TEvDiscardSnapshot, TKqpSnapshotEvents::EvDiscardSnapshot> { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index d7766ca8009..4b576ff2a38 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -504,7 +504,7 @@ public: auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout); auto snapMgrActorId = RegisterWithSameMailbox(snapMgr); - auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables()); + auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables(), std::move(QueryState->Orbit)); Send(snapMgrActorId, ev.release()); QueryState->TxCtx->SnapshotHandle.ManagingActor = snapMgrActorId; @@ -523,7 +523,7 @@ public: auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout); auto snapMgrActorId = RegisterWithSameMailbox(snapMgr); - auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(); + auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(std::move(QueryState->Orbit)); Send(snapMgrActorId, ev.release()); } @@ -545,6 +545,10 @@ public: TTimerGuard timer(this); auto *response = ev->Get(); + if (QueryState) { + QueryState->Orbit = std::move(response->Orbit); + } + if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) { auto& issues = response->Issues; ReplyQueryError(StatusForSnapshotError(response->Status), "", MessageFromIssues(issues)); diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index dafbf08248b..908bfcd853d 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -183,7 +183,12 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op, DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size()); if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) { - LWTRACK(ProposeTransactionSendResult, op->Orbit); + if (res->IsPrepared()) { + LWTRACK(ProposeTransactionSendPrepared, op->Orbit); + } else { + LWTRACK(ProposeTransactionSendResult, op->Orbit); + res->Orbit = std::move(op->Orbit); + } if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) { DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie()); } else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) { diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h index 26bf44e87de..e63f21a9016 100644 --- a/ydb/core/tx/datashard/probes.h +++ b/ydb/core/tx/datashard/probes.h @@ -31,6 +31,10 @@ GROUPS("DataShard"), \ TYPES(bool), \ NAMES("success")) \ + PROBE(ProposeTransactionSendPrepared, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ PROBE(ProposeTransactionKqpDataExecute, \ GROUPS("DataShard"), \ TYPES(), \ |