aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-05-18 11:30:23 +0300
committersnaury <snaury@ydb.tech>2023-05-18 11:30:23 +0300
commit46444f475a4f1f19ef04654624be6883ed89a130 (patch)
tree00e14b62ad8df3598b8a3b63b5767d00dd131c54
parent2c69e50ada916875a09f14a9f7548faa91d5306b (diff)
downloadydb-46444f475a4f1f19ef04654624be6883ed89a130.tar.gz
Pass orbit from kqp to long tx service and datashard
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp9
-rw-r--r--ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp11
-rw-r--r--ydb/core/kqp/rm_service/kqp_snapshot_manager.h17
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp8
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/probes.h4
6 files changed, 41 insertions, 15 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index afac59d7681..444536dcd35 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -344,6 +344,7 @@ private:
void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) {
TEvDataShard::TEvProposeTransactionResult* res = ev->Get();
+ ResponseEv->Orbit.Join(res->Orbit);
const ui64 shardId = res->GetOrigin();
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId);
@@ -971,6 +972,7 @@ private:
void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) {
TEvDataShard::TEvProposeTransactionResult* res = ev->Get();
+ ResponseEv->Orbit.Join(res->Orbit);
const ui64 shardId = res->GetOrigin();
LastShard = shardId;
@@ -1549,8 +1551,9 @@ private:
const ui32 flags =
(ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) |
(VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0);
+ std::unique_ptr<TEvDataShard::TEvProposeTransaction> evData;
if (GetSnapshot().IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) {
- ev.reset(new TEvDataShard::TEvProposeTransaction(
+ evData.reset(new TEvDataShard::TEvProposeTransaction(
NKikimrTxDataShard::TX_KIND_DATA,
SelfId(),
TxId,
@@ -1559,13 +1562,15 @@ private:
GetSnapshot().TxId,
flags));
} else {
- ev.reset(new TEvDataShard::TEvProposeTransaction(
+ evData.reset(new TEvDataShard::TEvProposeTransaction(
NKikimrTxDataShard::TX_KIND_DATA,
SelfId(),
TxId,
dataTransaction.SerializeAsString(),
flags));
}
+ ResponseEv->Orbit.Fork(evData->Orbit);
+ ev = std::move(evData);
}
auto traceId = ExecuterSpan.GetTraceId();
diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
index e461d85ab90..744d482f5e1 100644
--- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
+++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
@@ -46,12 +46,13 @@ private:
ClientActorId = ev->Sender;
Tables = ev->Get()->Tables;
MvccSnapshot = ev->Get()->MvccSnapshot;
+ Orbit = std::move(ev->Get()->Orbit);
LOG_D("KqpSnapshotManager: got snapshot request from " << ClientActorId);
if (MvccSnapshot) {
auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId());
- Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database));
+ Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database, std::move(Orbit)));
Become(&TThis::StateAwaitAcquireResult);
} else {
@@ -91,6 +92,7 @@ private:
void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr& ev) {
Y_VERIFY(MvccSnapshot);
Y_VERIFY(Tables.empty());
+ Orbit = std::move(ev->Get()->Orbit);
const auto& record = ev->Get()->Record;
if (record.GetStatus() == Ydb::StatusIds::SUCCESS) {
@@ -99,7 +101,7 @@ private:
LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " acquired");
bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
- Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}));
+ Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)));
Y_VERIFY_DEBUG(sent);
PassAway();
@@ -126,7 +128,7 @@ private:
LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " created");
bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
- Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}));
+ Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)));
Y_VERIFY_DEBUG(sent);
Become(&TThis::StateRefreshing);
@@ -224,7 +226,7 @@ private:
void ReplyErrorAndDie(NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues) {
if (CurrentStateFunc() == &TThis::StateAwaitCreation || CurrentStateFunc() == &TThis::StateAwaitAcquireResult) {
Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
- IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues)));
+ IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues), std::move(Orbit)));
} else {
SendDiscard();
}
@@ -236,6 +238,7 @@ private:
TVector<TString> Tables;
TActorId ClientActorId;
IKqpGateway::TKqpSnapshot Snapshot;
+ NLWTrace::TOrbit Orbit;
bool MvccSnapshot = false;
diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h
index 3c63d9dce0a..995d16c0857 100644
--- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h
+++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h
@@ -13,30 +13,35 @@ struct TEvKqpSnapshot {
struct TEvCreateSnapshotRequest : public TEventLocal<TEvCreateSnapshotRequest,
TKqpSnapshotEvents::EvCreateSnapshotRequest>
{
- explicit TEvCreateSnapshotRequest(const TVector<TString>& tables)
+ explicit TEvCreateSnapshotRequest(const TVector<TString>& tables, NLWTrace::TOrbit&& orbit = {})
: Tables(tables)
- , MvccSnapshot(false){}
+ , MvccSnapshot(false)
+ , Orbit(std::move(orbit)) {}
- explicit TEvCreateSnapshotRequest()
+ explicit TEvCreateSnapshotRequest(NLWTrace::TOrbit&& orbit = {})
: Tables({})
- , MvccSnapshot(true){}
+ , MvccSnapshot(true)
+ , Orbit(std::move(orbit)) {}
const TVector<TString> Tables;
const bool MvccSnapshot;
+ NLWTrace::TOrbit Orbit;
};
struct TEvCreateSnapshotResponse : public TEventLocal<TEvCreateSnapshotResponse,
TKqpSnapshotEvents::EvCreateSnapshotResponse>
{
TEvCreateSnapshotResponse(const IKqpGateway::TKqpSnapshot& snapshot,
- NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues)
+ NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues, NLWTrace::TOrbit&& orbit)
: Snapshot(snapshot)
, Status(status)
- , Issues(std::move(issues)) {}
+ , Issues(std::move(issues))
+ , Orbit(std::move(orbit)) {}
const IKqpGateway::TKqpSnapshot Snapshot;
const NKikimrIssues::TStatusIds::EStatusCode Status;
const NYql::TIssues Issues;
+ NLWTrace::TOrbit Orbit;
};
struct TEvDiscardSnapshot : public TEventLocal<TEvDiscardSnapshot, TKqpSnapshotEvents::EvDiscardSnapshot> {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index d7766ca8009..4b576ff2a38 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -504,7 +504,7 @@ public:
auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout);
auto snapMgrActorId = RegisterWithSameMailbox(snapMgr);
- auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables());
+ auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables(), std::move(QueryState->Orbit));
Send(snapMgrActorId, ev.release());
QueryState->TxCtx->SnapshotHandle.ManagingActor = snapMgrActorId;
@@ -523,7 +523,7 @@ public:
auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout);
auto snapMgrActorId = RegisterWithSameMailbox(snapMgr);
- auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>();
+ auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(std::move(QueryState->Orbit));
Send(snapMgrActorId, ev.release());
}
@@ -545,6 +545,10 @@ public:
TTimerGuard timer(this);
auto *response = ev->Get();
+ if (QueryState) {
+ QueryState->Orbit = std::move(response->Orbit);
+ }
+
if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) {
auto& issues = response->Issues;
ReplyQueryError(StatusForSnapshotError(response->Status), "", MessageFromIssues(issues));
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index dafbf08248b..908bfcd853d 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -183,7 +183,12 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size());
if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
- LWTRACK(ProposeTransactionSendResult, op->Orbit);
+ if (res->IsPrepared()) {
+ LWTRACK(ProposeTransactionSendPrepared, op->Orbit);
+ } else {
+ LWTRACK(ProposeTransactionSendResult, op->Orbit);
+ res->Orbit = std::move(op->Orbit);
+ }
if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) {
DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie());
} else if (op->IsImmediate() && op->IsReadOnly() && !op->IsAborted()) {
diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h
index 26bf44e87de..e63f21a9016 100644
--- a/ydb/core/tx/datashard/probes.h
+++ b/ydb/core/tx/datashard/probes.h
@@ -31,6 +31,10 @@
GROUPS("DataShard"), \
TYPES(bool), \
NAMES("success")) \
+ PROBE(ProposeTransactionSendPrepared, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
PROBE(ProposeTransactionKqpDataExecute, \
GROUPS("DataShard"), \
TYPES(), \