summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <[email protected]>2023-06-13 13:40:00 +0300
committerdcherednik <[email protected]>2023-06-13 13:40:00 +0300
commitcbaa5ef3a76c30c6f672f1ae3bfaf6d6d46e19aa (patch)
tree59ec9641a8e4abecc4de7372125233d3de2d1138
parent8fdf1deee61e063769a90a12dfbd84fbfd8abb6e (diff)
Fix race in case of early TEvDiscardSnapshot event.
kqp session actor can send TEvDiscardSnapshot before getting snapshot id from snapshot manager. Possible situations: - If snapshot manager if AwaitCreation state we must wait for getting snapshot id from TEvProposeTransactionStatus response and send DiscardVolatileSnapshot back. - If snapshot manager already send snapshot id to session actor and become StateRefreshing but session actor has not received this message and send TEvDiscardSnapshot without snapshot id
-rw-r--r--ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp64
1 files changed, 55 insertions, 9 deletions
diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
index 744d482f5e1..a7a80150ef1 100644
--- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
+++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
@@ -8,10 +8,16 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+static IOutputStream& operator<<(IOutputStream& out, const NKikimr::NKqp::IKqpGateway::TKqpSnapshot snap) {
+ out << "[step: " << snap.Step << ", txId: " << snap.TxId << "]";
+ return out;
+}
+
namespace NKikimr {
namespace NKqp {
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
+#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
namespace {
@@ -74,7 +80,7 @@ private:
STATEFN(StateAwaitAcquireResult) {
switch (ev->GetTypeRewrite()) {
hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle);
- hFunc(TEvents::TEvPoison, HandlePoison);
+ hFunc(TEvents::TEvPoison, Handle);
default:
HandleUnexpectedEvent("AwaitAcquireResult", ev->GetTypeRewrite());
}
@@ -83,12 +89,22 @@ private:
STATEFN(StateAwaitCreation) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle);
- hFunc(TEvents::TEvPoison, HandlePoison);
+ hFunc(TEvents::TEvPoison, Handle);
+ hFunc(TEvKqpSnapshot::TEvDiscardSnapshot, HandleAwaitCreation);
default:
HandleUnexpectedEvent("AwaitCreation", ev->GetTypeRewrite());
}
}
+ STATEFN(StateCleanup) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, HandleCleanup);
+ hFunc(TEvents::TEvPoison, Handle);
+ default:
+ HandleUnexpectedEvent("Cleanup", ev->GetTypeRewrite());
+ }
+ }
+
void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr& ev) {
Y_VERIFY(MvccSnapshot);
Y_VERIFY(Tables.empty());
@@ -98,7 +114,7 @@ private:
if (record.GetStatus() == Ydb::StatusIds::SUCCESS) {
Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
- LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " acquired");
+ LOG_D("KqpSnapshotManager: snapshot: " << Snapshot << " acquired");
bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)));
@@ -114,6 +130,32 @@ private:
}
}
+ void HandleCleanup(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
+ Y_VERIFY(!MvccSnapshot);
+
+ using EStatus = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus;
+
+ const auto* msg = ev->Get();
+ const auto status = static_cast<EStatus>(msg->Record.GetStatus());
+
+ if (status == EStatus::ExecComplete && msg->Record.GetStatusCode() == NKikimrIssues::TStatusIds::SUCCESS) {
+ Snapshot = IKqpGateway::TKqpSnapshot(msg->Record.GetStep(), msg->Record.GetTxId());
+
+ LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " created in cleanup state. Send discard");
+
+ SendDiscard();
+ }
+
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(msg->Record.GetIssues(), issues);
+ issues.AddIssue("stale propose TEvProposeTransactionStatus in cleanup state");
+
+ Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
+ IKqpGateway::TKqpSnapshot::InvalidSnapshot, NKikimrIssues::TStatusIds::TIMEOUT, std::move(issues), std::move(Orbit)));
+
+ PassAway();
+ }
+
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
Y_VERIFY(!MvccSnapshot);
@@ -146,8 +188,8 @@ private:
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvWakeup, HandleRefreshTimeout);
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, HandleRefreshStatus);
- hFunc(TEvKqpSnapshot::TEvDiscardSnapshot, HandleDiscardRequest);
- hFunc(TEvents::TEvPoison, HandlePoison);
+ hFunc(TEvKqpSnapshot::TEvDiscardSnapshot, HandleRefreshing);
+ hFunc(TEvents::TEvPoison, Handle);
default:
HandleUnexpectedEvent("Refreshing", ev->GetTypeRewrite());
}
@@ -185,15 +227,19 @@ private:
}
}
- void HandleDiscardRequest(TEvKqpSnapshot::TEvDiscardSnapshot::TPtr& ev) {
+ void HandleAwaitCreation(TEvKqpSnapshot::TEvDiscardSnapshot::TPtr&) {
+ LOG_D("KqpSnapshotManager: discarding snapshot in awaitCreation state; goto cleanup");
+ Become(&TThis::StateCleanup);
+ }
+
+ void HandleRefreshing(TEvKqpSnapshot::TEvDiscardSnapshot::TPtr& ev) {
const auto* msg = ev->Get();
- Y_ASSERT(msg->Snapshot == Snapshot);
- LOG_D("KqpSnapshotManager: discarding snapshot; shutting down");
+ LOG_W("KqpSnapshotManager: discarding snapshot; our snapshot: " << Snapshot << " got: " << msg->Snapshot << " shutting down");
SendDiscard();
PassAway();
}
- void HandlePoison(TEvents::TEvPoison::TPtr&) {
+ void Handle(TEvents::TEvPoison::TPtr&) {
LOG_D("KqpSnapshotManager: shutting down on timeout");
ReplyErrorAndDie(NKikimrIssues::TStatusIds::TIMEOUT, {});
}