diff options
author | dcherednik <[email protected]> | 2023-06-13 13:40:00 +0300 |
---|---|---|
committer | dcherednik <[email protected]> | 2023-06-13 13:40:00 +0300 |
commit | cbaa5ef3a76c30c6f672f1ae3bfaf6d6d46e19aa (patch) | |
tree | 59ec9641a8e4abecc4de7372125233d3de2d1138 | |
parent | 8fdf1deee61e063769a90a12dfbd84fbfd8abb6e (diff) |
Fix race in case of early TEvDiscardSnapshot event.
kqp session actor can send TEvDiscardSnapshot before getting snapshot id
from snapshot manager. Possible situations:
- If snapshot manager if AwaitCreation state we must
wait for getting snapshot id from TEvProposeTransactionStatus response and
send DiscardVolatileSnapshot back.
- If snapshot manager already send snapshot id to session actor and become
StateRefreshing but session actor has not received this message and send
TEvDiscardSnapshot without snapshot id
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp | 64 |
1 files changed, 55 insertions, 9 deletions
diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp index 744d482f5e1..a7a80150ef1 100644 --- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp +++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp @@ -8,10 +8,16 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +static IOutputStream& operator<<(IOutputStream& out, const NKikimr::NKqp::IKqpGateway::TKqpSnapshot snap) { + out << "[step: " << snap.Step << ", txId: " << snap.TxId << "]"; + return out; +} + namespace NKikimr { namespace NKqp { #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) #define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream) namespace { @@ -74,7 +80,7 @@ private: STATEFN(StateAwaitAcquireResult) { switch (ev->GetTypeRewrite()) { hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle); - hFunc(TEvents::TEvPoison, HandlePoison); + hFunc(TEvents::TEvPoison, Handle); default: HandleUnexpectedEvent("AwaitAcquireResult", ev->GetTypeRewrite()); } @@ -83,12 +89,22 @@ private: STATEFN(StateAwaitCreation) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle); - hFunc(TEvents::TEvPoison, HandlePoison); + hFunc(TEvents::TEvPoison, Handle); + hFunc(TEvKqpSnapshot::TEvDiscardSnapshot, HandleAwaitCreation); default: HandleUnexpectedEvent("AwaitCreation", ev->GetTypeRewrite()); } } + STATEFN(StateCleanup) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, HandleCleanup); + hFunc(TEvents::TEvPoison, Handle); + default: + HandleUnexpectedEvent("Cleanup", ev->GetTypeRewrite()); + } + } + void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr& ev) { Y_VERIFY(MvccSnapshot); Y_VERIFY(Tables.empty()); @@ -98,7 +114,7 @@ private: if (record.GetStatus() == Ydb::StatusIds::SUCCESS) { Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); - LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " acquired"); + LOG_D("KqpSnapshotManager: snapshot: " << Snapshot << " acquired"); bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit))); @@ -114,6 +130,32 @@ private: } } + void HandleCleanup(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) { + Y_VERIFY(!MvccSnapshot); + + using EStatus = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus; + + const auto* msg = ev->Get(); + const auto status = static_cast<EStatus>(msg->Record.GetStatus()); + + if (status == EStatus::ExecComplete && msg->Record.GetStatusCode() == NKikimrIssues::TStatusIds::SUCCESS) { + Snapshot = IKqpGateway::TKqpSnapshot(msg->Record.GetStep(), msg->Record.GetTxId()); + + LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " created in cleanup state. Send discard"); + + SendDiscard(); + } + + NYql::TIssues issues; + NYql::IssuesFromMessage(msg->Record.GetIssues(), issues); + issues.AddIssue("stale propose TEvProposeTransactionStatus in cleanup state"); + + Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( + IKqpGateway::TKqpSnapshot::InvalidSnapshot, NKikimrIssues::TStatusIds::TIMEOUT, std::move(issues), std::move(Orbit))); + + PassAway(); + } + void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) { Y_VERIFY(!MvccSnapshot); @@ -146,8 +188,8 @@ private: switch (ev->GetTypeRewrite()) { hFunc(TEvents::TEvWakeup, HandleRefreshTimeout); hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, HandleRefreshStatus); - hFunc(TEvKqpSnapshot::TEvDiscardSnapshot, HandleDiscardRequest); - hFunc(TEvents::TEvPoison, HandlePoison); + hFunc(TEvKqpSnapshot::TEvDiscardSnapshot, HandleRefreshing); + hFunc(TEvents::TEvPoison, Handle); default: HandleUnexpectedEvent("Refreshing", ev->GetTypeRewrite()); } @@ -185,15 +227,19 @@ private: } } - void HandleDiscardRequest(TEvKqpSnapshot::TEvDiscardSnapshot::TPtr& ev) { + void HandleAwaitCreation(TEvKqpSnapshot::TEvDiscardSnapshot::TPtr&) { + LOG_D("KqpSnapshotManager: discarding snapshot in awaitCreation state; goto cleanup"); + Become(&TThis::StateCleanup); + } + + void HandleRefreshing(TEvKqpSnapshot::TEvDiscardSnapshot::TPtr& ev) { const auto* msg = ev->Get(); - Y_ASSERT(msg->Snapshot == Snapshot); - LOG_D("KqpSnapshotManager: discarding snapshot; shutting down"); + LOG_W("KqpSnapshotManager: discarding snapshot; our snapshot: " << Snapshot << " got: " << msg->Snapshot << " shutting down"); SendDiscard(); PassAway(); } - void HandlePoison(TEvents::TEvPoison::TPtr&) { + void Handle(TEvents::TEvPoison::TPtr&) { LOG_D("KqpSnapshotManager: shutting down on timeout"); ReplyErrorAndDie(NKikimrIssues::TStatusIds::TIMEOUT, {}); } |