aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@gmail.com>2022-06-20 21:03:07 +0300
committerAleksei Borzenkov <snaury@gmail.com>2022-06-20 21:03:07 +0300
commit33c65a59cda3a972e9d670f1bc81b864fc263acf (patch)
tree3c5a49ef91da3418f484e6c4614a0ccb504ba37c
parentcfe2e50abfa852c2d69942c4d255a20147e46343 (diff)
downloadydb-33c65a59cda3a972e9d670f1bc81b864fc263acf.tar.gz
Lock tracking, KIKIMR-14732
ref:ae27b8de7d19e41dd50e93975b2bdc624f0cf577
-rw-r--r--ydb/core/protos/long_tx_service.proto23
-rw-r--r--ydb/core/protos/out/CMakeLists.txt1
-rw-r--r--ydb/core/protos/out/out_long_tx_service.cpp7
-rw-r--r--ydb/core/tx/long_tx_service/long_tx_service_impl.cpp535
-rw-r--r--ydb/core/tx/long_tx_service/long_tx_service_impl.h135
-rw-r--r--ydb/core/tx/long_tx_service/long_tx_service_ut.cpp127
-rw-r--r--ydb/core/tx/long_tx_service/public/CMakeLists.txt1
-rw-r--r--ydb/core/tx/long_tx_service/public/events.h61
-rw-r--r--ydb/core/tx/long_tx_service/public/lock_handle.cpp22
-rw-r--r--ydb/core/tx/long_tx_service/public/lock_handle.h73
10 files changed, 961 insertions, 24 deletions
diff --git a/ydb/core/protos/long_tx_service.proto b/ydb/core/protos/long_tx_service.proto
index 8d11cc2e0d..ddfab47aa8 100644
--- a/ydb/core/protos/long_tx_service.proto
+++ b/ydb/core/protos/long_tx_service.proto
@@ -74,3 +74,26 @@ message TEvAcquireReadSnapshotResult {
optional uint64 SnapshotTxId = 4;
optional string DatabaseName = 5;
}
+
+message TEvSubscribeLock {
+ optional uint64 LockId = 1;
+ optional uint32 LockNode = 2;
+}
+
+message TEvSubscribeLockResult {
+ enum EResult {
+ RESULT_UNSPECIFIED = 0;
+ RESULT_LOCK_SUBSCRIBED = 1;
+ RESULT_LOCK_NOT_FOUND = 2;
+ RESULT_LOCK_UNAVAILABLE = 3;
+ }
+
+ optional uint64 LockId = 1;
+ optional uint32 LockNode = 2;
+ optional EResult Result = 3;
+}
+
+message TEvUnsubscribeLock {
+ optional uint64 LockId = 1;
+ optional uint32 LockNode = 2;
+}
diff --git a/ydb/core/protos/out/CMakeLists.txt b/ydb/core/protos/out/CMakeLists.txt
index 45dfede0cc..ce490450ae 100644
--- a/ydb/core/protos/out/CMakeLists.txt
+++ b/ydb/core/protos/out/CMakeLists.txt
@@ -15,5 +15,6 @@ target_link_libraries(core-protos-out PUBLIC
)
target_sources(core-protos-out PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/out/out.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/out/out_long_tx_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/protos/out/out_sequenceshard.cpp
)
diff --git a/ydb/core/protos/out/out_long_tx_service.cpp b/ydb/core/protos/out/out_long_tx_service.cpp
new file mode 100644
index 0000000000..ee57e0c411
--- /dev/null
+++ b/ydb/core/protos/out/out_long_tx_service.cpp
@@ -0,0 +1,7 @@
+#include <ydb/core/protos/long_tx_service.pb.h>
+
+#include <util/stream/output.h>
+
+Y_DECLARE_OUT_SPEC(, NKikimrLongTxService::TEvSubscribeLockResult::EResult, stream, value) {
+ stream << NKikimrLongTxService::TEvSubscribeLockResult::EResult_Name(value);
+}
diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp
index 3e31da2be1..e4f4816d67 100644
--- a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp
+++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp
@@ -15,13 +15,69 @@ namespace NLongTxService {
static constexpr size_t MaxAcquireSnapshotInFlight = 4;
static constexpr TDuration AcquireSnapshotBatchDelay = TDuration::MicroSeconds(500);
+static constexpr TDuration RemoteLockTimeout = TDuration::Seconds(15);
+static constexpr bool InterconnectUndeliveryBroken = true;
void TLongTxServiceActor::Bootstrap() {
LogPrefix = TStringBuilder() << "TLongTxService [Node " << SelfId().NodeId() << "] ";
Become(&TThis::StateWork);
}
+void TLongTxServiceActor::TSessionSubscribeActor::Subscribe(const TActorId& sessionId) {
+ Send(sessionId, new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery);
+}
+
+void TLongTxServiceActor::TSessionSubscribeActor::Handle(TEvInterconnect::TEvNodeConnected::TPtr&) {
+ // nothing
+}
+
+void TLongTxServiceActor::TSessionSubscribeActor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
+ if (Self) {
+ Self->OnSessionDisconnected(ev->Sender);
+ }
+}
+
+void TLongTxServiceActor::TSessionSubscribeActor::Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ if (Self) {
+ Self->OnSessionDisconnected(ev->Sender);
+ }
+}
+
+TLongTxServiceActor::TSessionState& TLongTxServiceActor::SubscribeToSession(const TActorId& sessionId) {
+ auto it = Sessions.find(sessionId);
+ if (it != Sessions.end()) {
+ return it->second;
+ }
+ if (!SessionSubscribeActor) {
+ SessionSubscribeActor = new TSessionSubscribeActor(this);
+ RegisterWithSameMailbox(SessionSubscribeActor);
+ }
+ SessionSubscribeActor->Subscribe(sessionId);
+ return Sessions[sessionId];
+}
+
+void TLongTxServiceActor::OnSessionDisconnected(const TActorId& sessionId) {
+ auto itSession = Sessions.find(sessionId);
+ if (itSession == Sessions.end()) {
+ return;
+ }
+ auto& session = itSession->second;
+ for (ui64 lockId : session.SubscribedLocks) {
+ auto itLock = Locks.find(lockId);
+ if (itLock != Locks.end()) {
+ itLock->second.RemoteSubscribers.erase(sessionId);
+ }
+ }
+ session.SubscribedLocks.clear();
+ Sessions.erase(itSession);
+}
+
void TLongTxServiceActor::HandlePoison() {
+ if (SessionSubscribeActor) {
+ SessionSubscribeActor->PassAway();
+ SessionSubscribeActor->Self = nullptr;
+ SessionSubscribeActor = nullptr;
+ }
PassAway();
}
@@ -386,6 +442,289 @@ void TLongTxServiceActor::Handle(TEvPrivate::TEvAcquireSnapshotFinished::TPtr& e
}
}
+void TLongTxServiceActor::Handle(TEvLongTxService::TEvRegisterLock::TPtr& ev) {
+ auto* msg = ev->Get();
+ ui64 lockId = msg->LockId;
+ TXLOG_DEBUG("Received TEvRegisterLock for LockId# " << lockId);
+
+ Y_VERIFY(lockId, "Unexpected registration of a zero LockId");
+
+ auto& lock = Locks[lockId];
+ ++lock.RefCount;
+}
+
+void TLongTxServiceActor::Handle(TEvLongTxService::TEvUnregisterLock::TPtr& ev) {
+ auto* msg = ev->Get();
+ ui64 lockId = msg->LockId;
+ TXLOG_DEBUG("Received TEvUnregisterLock for LockId# " << lockId);
+
+ auto it = Locks.find(lockId);
+ if (it == Locks.end()) {
+ return;
+ }
+
+ auto& lock = it->second;
+ Y_VERIFY(lock.RefCount > 0);
+ if (0 == --lock.RefCount) {
+ for (auto& pr : lock.LocalSubscribers) {
+ Send(pr.first,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, SelfId().NodeId(),
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND),
+ 0, pr.second);
+ }
+ for (auto& prSession : lock.RemoteSubscribers) {
+ TActorId sessionId = prSession.first;
+ for (const auto& pr : prSession.second) {
+ SendViaSession(
+ sessionId, pr.first,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, SelfId().NodeId(),
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND),
+ 0, pr.second);
+ }
+ auto itSession = Sessions.find(sessionId);
+ if (itSession != Sessions.end()) {
+ itSession->second.SubscribedLocks.erase(lockId);
+ }
+ }
+ Locks.erase(it);
+ }
+}
+
+void TLongTxServiceActor::Handle(TEvLongTxService::TEvSubscribeLock::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+ ui64 lockId = record.GetLockId();
+ ui32 lockNode = record.GetLockNode();
+ TXLOG_DEBUG("Received TEvSubscribeLock from " << ev->Sender << " for LockId# " << lockId << " LockNode# " << lockNode);
+
+ if (!lockId) {
+ SendViaSession(
+ ev->InterconnectSession, ev->Sender,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, lockNode,
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE),
+ 0, ev->Cookie);
+ return;
+ }
+
+ // For remote locks we start a proxy subscription
+ if (lockNode != SelfId().NodeId()) {
+ auto& node = ConnectProxyNode(lockNode);
+ if (node.State == EProxyState::Disconnected) {
+ // Looks like there's no proxy for this node
+ Send(ev->Sender,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, lockNode,
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE),
+ 0, ev->Cookie);
+ return;
+ }
+
+ auto& lock = node.Locks[lockId];
+ if (lock.LockId == 0) {
+ lock.LockId = lockId;
+ }
+
+ if (lock.State == EProxyLockState::Subscribed) {
+ Send(ev->Sender,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, lockNode,
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED),
+ 0, ev->Cookie);
+ lock.RepliedSubscribers[ev->Sender] = ev->Cookie;
+ return;
+ }
+
+ lock.NewSubscribers[ev->Sender] = ev->Cookie;
+ lock.RepliedSubscribers.erase(ev->Sender);
+
+ // Send subscription request immediately if node is already connected
+ if (node.State == EProxyState::Connected && lock.Cookie == 0) {
+ lock.Cookie = ++LastCookie;
+ node.CookieToLock[lock.Cookie] = lockId;
+ SendViaSession(
+ node.Session, MakeLongTxServiceID(lockNode),
+ new TEvLongTxService::TEvSubscribeLock(lockId, lockNode),
+ IEventHandle::FlagTrackDelivery, lock.Cookie);
+ }
+
+ // Otherwise we wait until the lock is subscribed
+ return;
+ }
+
+ auto it = Locks.find(lockId);
+ if (it == Locks.end()) {
+ SendViaSession(
+ ev->InterconnectSession, ev->Sender,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, lockNode,
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND),
+ 0, ev->Cookie);
+ return;
+ }
+
+ auto& lock = it->second;
+ if (ev->InterconnectSession) {
+ auto& session = SubscribeToSession(ev->InterconnectSession);
+ session.SubscribedLocks.insert(lockId);
+ lock.RemoteSubscribers[ev->InterconnectSession][ev->Sender] = ev->Cookie;
+ } else {
+ lock.LocalSubscribers[ev->Sender] = ev->Cookie;
+ }
+
+ SendViaSession(
+ ev->InterconnectSession, ev->Sender,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, lockNode,
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED),
+ 0, ev->Cookie);
+}
+
+void TLongTxServiceActor::Handle(TEvLongTxService::TEvSubscribeLockResult::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+ ui64 lockId = record.GetLockId();
+ ui32 lockNode = record.GetLockNode();
+ auto lockStatus = record.GetResult();
+ TXLOG_DEBUG("Received TEvSubscribeLockResult from " << ev->Sender
+ << " for LockId# " << lockId << " LockNode# " << lockNode
+ << " LockStatus# " << lockStatus);
+
+ auto* node = ProxyNodes.FindPtr(lockNode);
+ if (!node || node->State != EProxyState::Connected) {
+ // Ignore replies from unexpected nodes
+ return;
+ }
+
+ if (ev->InterconnectSession != node->Session) {
+ // Ignore replies that arrived via unexpected sessions
+ return;
+ }
+
+ auto itLock = node->Locks.find(lockId);
+ if (itLock == node->Locks.end()) {
+ // Ignore replies for locks without subscriptions
+ return;
+ }
+
+ auto& lock = itLock->second;
+
+ if (lock.Cookie != ev->Cookie) {
+ // Ignore replies that don't have a matching cookie
+ return;
+ }
+
+ // Make sure lock is removed from expire queue
+ if (node->LockExpireQueue.Has(&lock)) {
+ node->LockExpireQueue.Remove(&lock);
+ }
+
+ // Special handling for successful lock subscriptions
+ if (lockStatus == NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED) {
+ lock.State = EProxyLockState::Subscribed;
+ for (auto& pr : lock.NewSubscribers) {
+ Send(pr.first,
+ new TEvLongTxService::TEvSubscribeLockResult(lockId, lockNode, lockStatus),
+ 0, pr.second);
+ lock.RepliedSubscribers[pr.first] = pr.second;
+ }
+ lock.NewSubscribers.clear();
+ return;
+ }
+
+ // Treat any other status as a confirmed error, reply to all and remove the lock
+
+ for (auto& pr : lock.RepliedSubscribers) {
+ Send(pr.first,
+ new TEvLongTxService::TEvSubscribeLockResult(lockId, lockNode, lockStatus),
+ 0, pr.second);
+ }
+
+ for (auto& pr : lock.NewSubscribers) {
+ Send(pr.first,
+ new TEvLongTxService::TEvSubscribeLockResult(lockId, lockNode, lockStatus),
+ 0, pr.second);
+ }
+
+ node->CookieToLock.erase(lock.Cookie);
+ node->Locks.erase(itLock);
+}
+
+void TLongTxServiceActor::Handle(TEvLongTxService::TEvUnsubscribeLock::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+ ui64 lockId = record.GetLockId();
+ ui32 lockNode = record.GetLockNode();
+ TXLOG_DEBUG("Received TEvUnsubscribeLock from " << ev->Sender << " for LockId# " << lockId << " LockNode# " << lockNode);
+
+ if (!lockId) {
+ return;
+ }
+
+ if (lockNode != SelfId().NodeId()) {
+ auto* node = ProxyNodes.FindPtr(lockNode);
+ if (!node) {
+ return;
+ }
+
+ auto itLock = node->Locks.find(lockId);
+ if (itLock == node->Locks.end()) {
+ return;
+ }
+
+ auto& lock = itLock->second;
+ lock.NewSubscribers.erase(ev->Sender);
+ lock.RepliedSubscribers.erase(ev->Sender);
+
+ if (lock.Empty()) {
+ // We don't need this lock anymore, unsubscribe if the node is already connected
+ if (node->State == EProxyState::Connected) {
+ SendViaSession(
+ node->Session, MakeLongTxServiceID(lockNode),
+ new TEvLongTxService::TEvUnsubscribeLock(lockId, lockNode));
+ }
+ if (node->LockExpireQueue.Has(&lock)) {
+ node->LockExpireQueue.Remove(&lock);
+ }
+ node->CookieToLock.erase(lock.Cookie);
+ node->Locks.erase(itLock);
+ }
+
+ return;
+ }
+
+ auto it = Locks.find(lockId);
+ if (it == Locks.end()) {
+ return;
+ }
+
+ auto& lock = it->second;
+ if (ev->InterconnectSession) {
+ auto itSubscribers = lock.RemoteSubscribers.find(ev->InterconnectSession);
+ if (itSubscribers != lock.RemoteSubscribers.end()) {
+ itSubscribers->second.erase(ev->Sender);
+ if (itSubscribers->second.empty()) {
+ lock.RemoteSubscribers.erase(itSubscribers);
+ auto itSession = Sessions.find(ev->InterconnectSession);
+ if (itSession != Sessions.end()) {
+ itSession->second.SubscribedLocks.erase(lockId);
+ }
+ }
+ }
+ } else {
+ lock.LocalSubscribers.erase(ev->Sender);
+ }
+}
+
+void TLongTxServiceActor::SendViaSession(const TActorId& sessionId, const TActorId& recipient,
+ IEventBase* event, ui32 flags, ui64 cookie)
+{
+ auto ev = MakeHolder<IEventHandle>(recipient, SelfId(), event, flags, cookie);
+ if (sessionId) {
+ ev->Rewrite(TEvInterconnect::EvForward, sessionId);
+ }
+ TActivationContext::Send(ev.Release());
+}
+
void TLongTxServiceActor::SendReply(ERequestType type, TActorId sender, ui64 cookie,
Ydb::StatusIds::StatusCode status, TStringBuf details)
{
@@ -414,15 +753,25 @@ void TLongTxServiceActor::SendReplyUnavailable(ERequestType type, TActorId sende
SendReply(type, sender, cookie, Ydb::StatusIds::UNAVAILABLE, details);
}
-void TLongTxServiceActor::SendProxyRequest(ui32 nodeId, ERequestType type, THolder<IEventHandle> ev) {
+TLongTxServiceActor::TProxyNodeState& TLongTxServiceActor::ConnectProxyNode(ui32 nodeId) {
auto& node = ProxyNodes[nodeId];
- if (node.State == EProxyState::Unknown) {
- auto proxy = TActivationContext::InterconnectProxy(nodeId);
- if (!proxy) {
- return SendReplyUnavailable(type, ev->Sender, ev->Cookie, "Cannot forward request: node unknown");
+ if (node.NodeId == 0) {
+ node.NodeId = nodeId;
+ }
+ if (node.State == EProxyState::Disconnected) {
+ // Node will be left in Disconnected state if there's no proxy
+ if (auto proxy = TActivationContext::InterconnectProxy(nodeId)) {
+ Send(proxy, new TEvInterconnect::TEvConnectNode(), IEventHandle::FlagTrackDelivery, nodeId);
+ node.State = EProxyState::Connecting;
}
- Send(proxy, new TEvInterconnect::TEvConnectNode(), IEventHandle::FlagTrackDelivery, nodeId);
- node.State = EProxyState::Connecting;
+ }
+ return node;
+}
+
+void TLongTxServiceActor::SendProxyRequest(ui32 nodeId, ERequestType type, THolder<IEventHandle> ev) {
+ auto& node = ConnectProxyNode(nodeId);
+ if (node.State == EProxyState::Disconnected) {
+ return SendReplyUnavailable(type, ev->Sender, ev->Cookie, "Cannot forward request: node unknown");
}
ui64 cookie = ++LastCookie;
@@ -460,38 +809,60 @@ void TLongTxServiceActor::SendProxyRequest(ui32 nodeId, ERequestType type, THold
}
void TLongTxServiceActor::Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
- auto it = ProxyNodes.find(ev->Get()->NodeId);
- if (it == ProxyNodes.end()) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ TXLOG_DEBUG("Received TEvNodeConnected for NodeId# " << nodeId << " from session " << ev->Sender);
+
+ auto itNode = ProxyNodes.find(nodeId);
+ if (itNode == ProxyNodes.end()) {
return;
}
- auto& node = it->second;
+
+ auto& node = itNode->second;
if (node.State != EProxyState::Connecting) {
return;
}
+
node.State = EProxyState::Connected;
node.Session = ev->Sender;
+
auto pending = std::move(node.Pending);
- Y_VERIFY_DEBUG(node.Pending.empty());
+ node.Pending.clear();
for (auto& req : pending) {
req.Ev->Rewrite(TEvInterconnect::EvForward, node.Session);
TActivationContext::Send(req.Ev.Release());
req.Request->State = ERequestState::Sent;
}
+
+ // Send subscription requests for all remote locks
+ for (auto& pr : node.Locks) {
+ const ui64 lockId = pr.first;
+ auto& lock = pr.second;
+ lock.Cookie = ++LastCookie;
+ node.CookieToLock[lock.Cookie] = lock.Cookie;
+ SendViaSession(
+ node.Session, MakeLongTxServiceID(nodeId),
+ new TEvLongTxService::TEvSubscribeLock(lockId, nodeId),
+ IEventHandle::FlagTrackDelivery, lock.Cookie);
+ }
}
void TLongTxServiceActor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
- OnNodeDisconnected(ev->Get()->NodeId, ev->Sender);
+ const ui32 nodeId = ev->Get()->NodeId;
+ TXLOG_DEBUG("Received TEvNodeDisconnected for NodeId# " << nodeId << " from session " << ev->Sender);
+ OnNodeDisconnected(nodeId, ev->Sender);
}
void TLongTxServiceActor::OnNodeDisconnected(ui32 nodeId, const TActorId& sender) {
- auto it = ProxyNodes.find(nodeId);
- if (it == ProxyNodes.end()) {
+ auto itNode = ProxyNodes.find(nodeId);
+ if (itNode == ProxyNodes.end()) {
return;
}
- auto& node = it->second;
+
+ auto& node = itNode->second;
if (node.Session && node.Session != sender) {
return;
}
+
if (node.ActiveRequests) {
NYql::TIssues issuesPending;
issuesPending.AddIssue("Cannot forward request: node disconnected");
@@ -511,26 +882,116 @@ void TLongTxServiceActor::OnNodeDisconnected(ui32 nodeId, const TActorId& sender
}
};
- for (auto& pr : node.ActiveRequests) {
+ auto active = std::move(node.ActiveRequests);
+ node.ActiveRequests.clear();
+ for (auto& pr : active) {
auto& req = pr.second;
const auto status = isRetriable(req) ? Ydb::StatusIds::UNAVAILABLE : Ydb::StatusIds::UNDETERMINED;
const auto& issues = req.State == ERequestState::Pending ? issuesPending : issuesSent;
SendReplyIssues(req.Type, req.Sender, req.Cookie, status, issues);
}
}
- ProxyNodes.erase(it);
+
+ if (node.Pending) {
+ auto pending = std::move(node.Pending);
+ node.Pending.clear();
+ }
+
+ TMonotonic now = TActivationContext::Monotonic();
+
+ for (auto& pr : node.Locks) {
+ auto& lock = pr.second;
+ // For each lock remember the first time it became unavailable
+ if (lock.State != EProxyLockState::Unavailable) {
+ lock.State = EProxyLockState::Unavailable;
+ lock.ExpiresAt = now + RemoteLockTimeout;
+ node.LockExpireQueue.Add(&lock);
+ }
+ // Forget subscribe requests that have been in-flight
+ if (lock.Cookie != 0) {
+ node.CookieToLock.erase(lock.Cookie);
+ lock.Cookie = 0;
+ }
+ }
+
+ // See which locks are unavailable for more than timeout
+ while (auto* lock = node.LockExpireQueue.Top()) {
+ if (now < lock->ExpiresAt) {
+ break;
+ }
+ RemoveUnavailableLock(node, *lock);
+ }
+
+ if (node.Locks.empty()) {
+ // Remove an unnecessary node structure
+ ProxyNodes.erase(itNode);
+ return;
+ }
+
+ node.State = EProxyState::Disconnected;
+ node.Session = {};
+
+ // TODO: faster retries with exponential backoff?
+ Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvReconnect(nodeId));
+}
+
+void TLongTxServiceActor::Handle(TEvPrivate::TEvReconnect::TPtr& ev) {
+ ui32 nodeId = ev->Get()->NodeId;
+
+ auto& node = ConnectProxyNode(nodeId);
+ if (node.State == EProxyState::Disconnected) {
+ // TODO: handle proxy disappearing from interconnect?
+ }
}
void TLongTxServiceActor::Handle(TEvents::TEvUndelivered::TPtr& ev) {
- if (ev->Get()->SourceType == TEvInterconnect::EvConnectNode) {
+ auto* msg = ev->Get();
+ TXLOG_DEBUG("Received TEvUndelivered from " << ev->Sender << " cookie " << ev->Cookie
+ << " type " << msg->SourceType
+ << " reason " << msg->Reason
+ << " session " << ev->InterconnectSession);
+
+ if (msg->SourceType == TEvInterconnect::EvConnectNode) {
return OnNodeDisconnected(ev->Cookie, ev->Sender);
}
+
+ // InterconnectSession will be set if we received this notification from
+ // a remote node, as opposed to a local notification when message is not
+ // delivered to interconnect session itself. Session problems are handled
+ // by separate disconnect notifications.
+ // FIXME: currently interconnect mock is broken, so we assume all other
+ // undelivery notifications are coming over the network. It should be
+ // safe to do, since each request uses a unique cookie that is only valid
+ // while session is connected.
+ if (!ev->InterconnectSession && !InterconnectUndeliveryBroken) {
+ return;
+ }
+
auto nodeId = ev->Sender.NodeId();
- auto it = ProxyNodes.find(nodeId);
- if (it == ProxyNodes.end()) {
+ auto itNode = ProxyNodes.find(nodeId);
+ if (itNode == ProxyNodes.end()) {
return;
}
- auto& node = it->second;
+
+ auto& node = itNode->second;
+
+ if (msg->SourceType == TEvLongTxService::EvSubscribeLock) {
+ auto itCookie = node.CookieToLock.find(ev->Cookie);
+ if (itCookie == node.CookieToLock.end()) {
+ return;
+ }
+
+ ui64 lockId = itCookie->second;
+ auto itLock = node.Locks.find(lockId);
+ if (itLock == node.Locks.end()) {
+ return;
+ }
+
+ auto& lock = itLock->second;
+ RemoveUnavailableLock(node, lock);
+ return;
+ }
+
auto itReq = node.ActiveRequests.find(ev->Cookie);
if (itReq == node.ActiveRequests.end()) {
return;
@@ -540,5 +1001,37 @@ void TLongTxServiceActor::Handle(TEvents::TEvUndelivered::TPtr& ev) {
node.ActiveRequests.erase(itReq);
}
+void TLongTxServiceActor::RemoveUnavailableLock(TProxyNodeState& node, TProxyLockState& lock) {
+ const ui32 nodeId = node.NodeId;
+ const ui64 lockId = lock.LockId;
+
+ for (auto& pr : lock.RepliedSubscribers) {
+ Send(pr.first,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, nodeId,
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE),
+ 0, pr.second);
+ }
+
+ for (auto& pr : lock.NewSubscribers) {
+ Send(pr.first,
+ new TEvLongTxService::TEvSubscribeLockResult(
+ lockId, nodeId,
+ NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE),
+ 0, pr.second);
+ }
+
+ if (node.LockExpireQueue.Has(&lock)) {
+ node.LockExpireQueue.Remove(&lock);
+ }
+
+ if (lock.Cookie != 0) {
+ node.CookieToLock.erase(lock.Cookie);
+ lock.Cookie = 0;
+ }
+
+ node.Locks.erase(lockId);
+}
+
} // namespace NLongTxService
} // namespace NKikimr
diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.h b/ydb/core/tx/long_tx_service/long_tx_service_impl.h
index a72ddb185e..9ce68680a8 100644
--- a/ydb/core/tx/long_tx_service/long_tx_service_impl.h
+++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.h
@@ -2,6 +2,7 @@
#include "long_tx_service.h"
#include <ydb/core/tx/long_tx_service/public/events.h>
+#include <ydb/core/util/intrusive_heap.h>
#include <ydb/core/util/ulid.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -51,8 +52,14 @@ namespace NLongTxService {
Sent,
};
- enum class EProxyState {
+ enum class EProxyLockState {
Unknown,
+ Subscribed,
+ Unavailable,
+ };
+
+ enum class EProxyState {
+ Disconnected,
Connecting,
Connected,
};
@@ -69,14 +76,46 @@ namespace NLongTxService {
TProxyRequestState* Request = nullptr;
};
+ struct TProxyLockState {
+ EProxyLockState State = EProxyLockState::Unknown;
+ ui64 LockId = 0;
+ ui64 Cookie = 0;
+ THashMap<TActorId, ui64> NewSubscribers;
+ THashMap<TActorId, ui64> RepliedSubscribers;
+ // Intrusive heap support
+ size_t HeapIndex = -1;
+ TMonotonic ExpiresAt;
+
+ bool Empty() const {
+ return NewSubscribers.empty() && RepliedSubscribers.empty();
+ }
+
+ struct THeapIndex {
+ size_t& operator()(TProxyLockState& value) const {
+ return value.HeapIndex;
+ }
+ };
+
+ struct THeapLess {
+ bool operator()(const TProxyLockState& a, const TProxyLockState& b) const {
+ return a.ExpiresAt < b.ExpiresAt;
+ }
+ };
+ };
+
struct TProxyNodeState {
- EProxyState State = EProxyState::Unknown;
+ EProxyState State = EProxyState::Disconnected;
+ ui32 NodeId = 0;
// Currently connected interconnect session
TActorId Session;
// Cookie to an active request
THashMap<ui64, TProxyRequestState> ActiveRequests;
// Pending events, waiting for the node to become connected
TVector<TProxyPendingRequest> Pending;
+ // Locks requested by local subscribers
+ THashMap<ui64, TProxyLockState> Locks;
+ TIntrusiveHeap<TProxyLockState, TProxyLockState::THeapIndex, TProxyLockState::THeapLess> LockExpireQueue;
+ THashMap<ui64, ui64> CookieToLock;
};
struct TAcquireSnapshotUserRequest {
@@ -103,12 +142,24 @@ namespace NLongTxService {
bool FlushPending = false;
};
+ struct TLockState {
+ ui64 RefCount = 0;
+
+ THashMap<TActorId, ui64> LocalSubscribers;
+ THashMap<TActorId, THashMap<TActorId, ui64>> RemoteSubscribers;
+ };
+
+ struct TSessionState {
+ THashSet<ui64> SubscribedLocks;
+ };
+
private:
struct TEvPrivate {
enum EEv {
EvCommitFinished = EventSpaceBegin(TEvents::ES_PRIVATE),
EvAcquireSnapshotFlush,
EvAcquireSnapshotFinished,
+ EvReconnect,
};
struct TEvCommitFinished : public TEventLocal<TEvCommitFinished, EvCommitFinished> {
@@ -151,6 +202,53 @@ namespace NLongTxService {
, Issues(std::move(issues))
{ }
};
+
+ struct TEvReconnect : public TEventLocal<TEvReconnect, EvReconnect> {
+ const ui32 NodeId;
+
+ explicit TEvReconnect(ui32 nodeId)
+ : NodeId(nodeId)
+ { }
+ };
+ };
+
+ private:
+ class TSessionSubscribeActor : public TActor<TSessionSubscribeActor> {
+ friend class TLongTxServiceActor;
+
+ public:
+ TSessionSubscribeActor(TLongTxServiceActor* self)
+ : TActor(&TThis::StateWork)
+ , Self(self)
+ { }
+
+ ~TSessionSubscribeActor() {
+ if (Self) {
+ Self->SessionSubscribeActor = nullptr;
+ Self = nullptr;
+ }
+ }
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::LONG_TX_SERVICE;
+ }
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ }
+ }
+
+ void Subscribe(const TActorId& sessionId);
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev);
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
+ void Handle(TEvents::TEvUndelivered::TPtr& ev);
+
+ private:
+ TLongTxServiceActor* Self;
};
public:
@@ -160,6 +258,13 @@ namespace NLongTxService {
Y_UNUSED(Settings); // TODO
}
+ ~TLongTxServiceActor() {
+ if (SessionSubscribeActor) {
+ SessionSubscribeActor->Self = nullptr;
+ SessionSubscribeActor = nullptr;
+ }
+ }
+
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::LONG_TX_SERVICE;
}
@@ -167,6 +272,10 @@ namespace NLongTxService {
void Bootstrap();
private:
+ TSessionState& SubscribeToSession(const TActorId& sessionId);
+ void OnSessionDisconnected(const TActorId& sessionId);
+
+ private:
STFUNC(StateWork) {
Y_UNUSED(ctx);
switch (ev->GetTypeRewrite()) {
@@ -182,8 +291,14 @@ namespace NLongTxService {
hFunc(TEvLongTxService::TEvAcquireReadSnapshot, Handle);
hFunc(TEvPrivate::TEvAcquireSnapshotFlush, Handle);
hFunc(TEvPrivate::TEvAcquireSnapshotFinished, Handle);
+ hFunc(TEvLongTxService::TEvRegisterLock, Handle);
+ hFunc(TEvLongTxService::TEvUnregisterLock, Handle);
+ hFunc(TEvLongTxService::TEvSubscribeLock, Handle);
+ hFunc(TEvLongTxService::TEvSubscribeLockResult, Handle);
+ hFunc(TEvLongTxService::TEvUnsubscribeLock, Handle);
hFunc(TEvInterconnect::TEvNodeConnected, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvPrivate::TEvReconnect, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
}
}
@@ -200,22 +315,35 @@ namespace NLongTxService {
void Handle(TEvLongTxService::TEvAcquireReadSnapshot::TPtr& ev);
void Handle(TEvPrivate::TEvAcquireSnapshotFlush::TPtr& ev);
void Handle(TEvPrivate::TEvAcquireSnapshotFinished::TPtr& ev);
+ void Handle(TEvLongTxService::TEvRegisterLock::TPtr& ev);
+ void Handle(TEvLongTxService::TEvUnregisterLock::TPtr& ev);
+ void Handle(TEvLongTxService::TEvSubscribeLock::TPtr& ev);
+ void Handle(TEvLongTxService::TEvSubscribeLockResult::TPtr& ev);
+ void Handle(TEvLongTxService::TEvUnsubscribeLock::TPtr& ev);
private:
+ void SendViaSession(const TActorId& sessionId, const TActorId& recipient,
+ IEventBase* event, ui32 flags = 0, ui64 cookie = 0);
+
void SendReply(ERequestType type, TActorId sender, ui64 cookie,
Ydb::StatusIds::StatusCode status, TStringBuf details);
void SendReplyIssues(ERequestType type, TActorId sender, ui64 cookie,
Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void SendReplyUnavailable(ERequestType type, TActorId sender, ui64 cookie, TStringBuf details);
+ TProxyNodeState& ConnectProxyNode(ui32 nodeId);
void SendProxyRequest(ui32 nodeId, ERequestType type, THolder<IEventHandle> ev);
void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void OnNodeDisconnected(ui32 nodeId, const TActorId& sender);
+ void Handle(TEvPrivate::TEvReconnect::TPtr& ev);
void Handle(TEvents::TEvUndelivered::TPtr& ev);
private:
+ void RemoveUnavailableLock(TProxyNodeState& node, TProxyLockState& lock);
+
+ private:
void StartCommitActor(TTransaction& tx);
void ScheduleAcquireSnapshot(const TString& databaseName, TDatabaseSnapshotState& state);
void StartAcquireSnapshotActor(const TString& databaseName, TDatabaseSnapshotState& state);
@@ -226,12 +354,15 @@ namespace NLongTxService {
private:
const TLongTxServiceSettings Settings;
TString LogPrefix;
+ TSessionSubscribeActor* SessionSubscribeActor = nullptr;
THashMap<TULID, TTransaction> Transactions;
TULIDGenerator IdGenerator;
THashMap<ui32, TProxyNodeState> ProxyNodes;
THashMap<TString, TDatabaseSnapshotState> DatabaseSnapshots;
THashMap<ui64, TAcquireSnapshotState> AcquireSnapshotInFlight;
TString DefaultDatabaseName;
+ THashMap<ui64, TLockState> Locks;
+ THashMap<TActorId, TSessionState> Sessions;
ui64 LastCookie = 0;
};
diff --git a/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp b/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp
index 1a2375e15f..8504978778 100644
--- a/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp
+++ b/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp
@@ -1,6 +1,7 @@
#include "long_tx_service.h"
#include <ydb/core/tx/long_tx_service/public/events.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/scheme_board/cache.h>
#include <ydb/core/testlib/tablet_helpers.h>
@@ -170,7 +171,7 @@ Y_UNIT_TEST_SUITE(LongTxService) {
// Change txId to a non-existant node and try to commit
{
auto badTxId = txId;
- badTxId.NodeId = 3;
+ badTxId.NodeId = runtime.GetNodeId(1) + 1;
runtime.Send(
new IEventHandle(service2, sender2,
new TEvLongTxService::TEvCommitTx(badTxId)),
@@ -234,6 +235,130 @@ Y_UNIT_TEST_SUITE(LongTxService) {
}
}
+ Y_UNIT_TEST(LockSubscribe) {
+ TTenantTestRuntime runtime(MakeTenantTestConfig(true));
+ runtime.SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NLog::PRI_DEBUG);
+
+ TLockHandle handle(123, runtime.GetActorSystem(0));
+
+ auto node1 = runtime.GetNodeId(0);
+ auto sender1 = runtime.AllocateEdgeActor(0);
+ auto service1 = MakeLongTxServiceID(node1);
+ auto node2 = runtime.GetNodeId(1);
+ auto sender2 = runtime.AllocateEdgeActor(1);
+ auto service2 = MakeLongTxServiceID(node2);
+
+ {
+ runtime.Send(
+ new IEventHandle(service1, sender1,
+ new TEvLongTxService::TEvSubscribeLock(987, node1)),
+ 0, true);
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender1);
+ const auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 987u);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND);
+ }
+
+ {
+ runtime.Send(
+ new IEventHandle(service2, sender2,
+ new TEvLongTxService::TEvSubscribeLock(987, node1)),
+ 1, true);
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2);
+ const auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 987u);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(),node1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND);
+ }
+
+ {
+ runtime.Send(
+ new IEventHandle(service1, sender1,
+ new TEvLongTxService::TEvSubscribeLock(123, node1)),
+ 0, true);
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender1);
+ const auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED);
+ }
+
+ {
+ runtime.Send(
+ new IEventHandle(service2, sender2,
+ new TEvLongTxService::TEvSubscribeLock(123, node1)),
+ 1, true);
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2);
+ const auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED);
+ }
+
+ {
+ // move lock handle out, so it unregisters itself
+ auto movedOut = std::move(handle);
+ }
+
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender1);
+ const auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND);
+ }
+
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2);
+ const auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND);
+ }
+
+ // Block all cross-node TEvSubscribeLock messages and disconnect instead
+ size_t disconnectCount = 0;
+ auto observer = [&](auto& runtime, auto& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvLongTxService::TEvSubscribeLock::EventType: {
+ ui32 node1 = ev->Sender.NodeId();
+ ui32 node2 = ev->Recipient.NodeId();
+ if (node1 != node2) {
+ ++disconnectCount;
+ auto proxy = runtime.GetInterconnectProxy(0, 1);
+ runtime.Send(
+ new IEventHandle(proxy, {}, new TEvInterconnect::TEvDisconnect()),
+ 0, true);
+ // Advance time on each disconnect, so timeout happens faster
+ runtime.AdvanceCurrentTime(TDuration::Seconds(5));
+ return TTestBasicRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestBasicRuntime::EEventAction::PROCESS;
+ };
+ runtime.SetObserverFunc(observer);
+
+ // Try to subscribe to the lock of a disconnecting node
+ // We should eventually get an unavailable result
+ {
+ runtime.Send(
+ new IEventHandle(service2, sender2,
+ new TEvLongTxService::TEvSubscribeLock(234, node1)),
+ 1, true);
+ auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2);
+ const auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 234u);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE);
+ }
+
+ // We expect multiple disconnects before unavailable result is returned
+ UNIT_ASSERT_GE(disconnectCount, 3);
+ }
+
} // Y_UNIT_TEST_SUITE(LongTxService)
} // namespace NLongTxService
diff --git a/ydb/core/tx/long_tx_service/public/CMakeLists.txt b/ydb/core/tx/long_tx_service/public/CMakeLists.txt
index 94727e7b74..f5fe906228 100644
--- a/ydb/core/tx/long_tx_service/public/CMakeLists.txt
+++ b/ydb/core/tx/long_tx_service/public/CMakeLists.txt
@@ -23,5 +23,6 @@ target_link_libraries(tx-long_tx_service-public PUBLIC
)
target_sources(tx-long_tx_service-public PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/long_tx_service/public/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/long_tx_service/public/lock_handle.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/long_tx_service/public/types.cpp
)
diff --git a/ydb/core/tx/long_tx_service/public/events.h b/ydb/core/tx/long_tx_service/public/events.h
index e053c1c857..15612123ac 100644
--- a/ydb/core/tx/long_tx_service/public/events.h
+++ b/ydb/core/tx/long_tx_service/public/events.h
@@ -25,6 +25,11 @@ namespace NLongTxService {
EvAttachColumnShardWritesResult,
EvAcquireReadSnapshot,
EvAcquireReadSnapshotResult,
+ EvRegisterLock,
+ EvUnregisterLock,
+ EvSubscribeLock,
+ EvSubscribeLockResult,
+ EvUnsubscribeLock,
EvEnd,
};
@@ -187,6 +192,62 @@ namespace NLongTxService {
}
}
};
+
+ struct TEvRegisterLock
+ : TEventLocal<TEvRegisterLock, EvRegisterLock>
+ {
+ const ui64 LockId;
+
+ explicit TEvRegisterLock(ui64 lockId)
+ : LockId(lockId)
+ { }
+ };
+
+ struct TEvUnregisterLock
+ : TEventLocal<TEvUnregisterLock, EvUnregisterLock>
+ {
+ const ui64 LockId;
+
+ explicit TEvUnregisterLock(ui64 lockId)
+ : LockId(lockId)
+ { }
+ };
+
+ struct TEvSubscribeLock
+ : TEventPB<TEvSubscribeLock, NKikimrLongTxService::TEvSubscribeLock, EvSubscribeLock>
+ {
+ TEvSubscribeLock() = default;
+
+ TEvSubscribeLock(ui64 lockId, ui32 lockNode) {
+ Record.SetLockId(lockId);
+ Record.SetLockNode(lockNode);
+ }
+ };
+
+ struct TEvSubscribeLockResult
+ : TEventPB<TEvSubscribeLockResult, NKikimrLongTxService::TEvSubscribeLockResult, EvSubscribeLockResult>
+ {
+ using EResult = NKikimrLongTxService::TEvSubscribeLockResult::EResult;
+
+ TEvSubscribeLockResult() = default;
+
+ TEvSubscribeLockResult(ui64 lockId, ui32 lockNode, EResult result) {
+ Record.SetLockId(lockId);
+ Record.SetLockNode(lockNode);
+ Record.SetResult(result);
+ }
+ };
+
+ struct TEvUnsubscribeLock
+ : TEventPB<TEvUnsubscribeLock, NKikimrLongTxService::TEvUnsubscribeLock, EvUnsubscribeLock>
+ {
+ TEvUnsubscribeLock() = default;
+
+ TEvUnsubscribeLock(ui64 lockId, ui32 lockNode) {
+ Record.SetLockId(lockId);
+ Record.SetLockNode(lockNode);
+ }
+ };
};
} // namespace NLongTxService
diff --git a/ydb/core/tx/long_tx_service/public/lock_handle.cpp b/ydb/core/tx/long_tx_service/public/lock_handle.cpp
new file mode 100644
index 0000000000..875c387f24
--- /dev/null
+++ b/ydb/core/tx/long_tx_service/public/lock_handle.cpp
@@ -0,0 +1,22 @@
+#include "lock_handle.h"
+#include "events.h"
+
+#include <library/cpp/actors/core/actorsystem.h>
+
+namespace NKikimr {
+namespace NLongTxService {
+
+ void TLockHandle::Register(ui64 lockId, TActorSystem* as) noexcept {
+ Y_VERIFY(lockId, "Cannot register a zero lock id");
+ Y_VERIFY(as, "Cannot register without a valid actor system");
+ as->Send(MakeLongTxServiceID(as->NodeId), new TEvLongTxService::TEvRegisterLock(lockId));
+ }
+
+ void TLockHandle::Unregister(ui64 lockId, TActorSystem* as) noexcept {
+ Y_VERIFY(lockId, "Cannot unregister a zero lock id");
+ Y_VERIFY(as, "Cannot unregister without a valid actor system");
+ as->Send(MakeLongTxServiceID(as->NodeId), new TEvLongTxService::TEvUnregisterLock(lockId));
+ }
+
+} // namespace NLongTxService
+} // namespace NKikimr
diff --git a/ydb/core/tx/long_tx_service/public/lock_handle.h b/ydb/core/tx/long_tx_service/public/lock_handle.h
new file mode 100644
index 0000000000..fa7647d12a
--- /dev/null
+++ b/ydb/core/tx/long_tx_service/public/lock_handle.h
@@ -0,0 +1,73 @@
+#pragma once
+#include <util/system/compiler.h>
+#include <util/system/types.h>
+
+namespace NActors {
+
+ // Avoid include dependency by forward declaring TActorSystem
+ class TActorSystem;
+
+} // namespace NActors
+
+namespace NKikimr {
+namespace NLongTxService {
+
+ class TLockHandle {
+ public:
+ TLockHandle() noexcept
+ : LockId(0)
+ , ActorSystem(nullptr)
+ { }
+
+ TLockHandle(ui64 lockId, NActors::TActorSystem* as) noexcept
+ : LockId(lockId)
+ , ActorSystem(as)
+ {
+ if (LockId) {
+ Register(LockId, ActorSystem);
+ }
+ }
+
+ TLockHandle(TLockHandle&& rhs) noexcept
+ : LockId(rhs.LockId)
+ , ActorSystem(rhs.ActorSystem)
+ {
+ rhs.LockId = 0;
+ rhs.ActorSystem = nullptr;
+ }
+
+ ~TLockHandle() noexcept {
+ if (LockId) {
+ Unregister(LockId, ActorSystem);
+ LockId = 0;
+ }
+ }
+
+ TLockHandle& operator=(TLockHandle&& rhs) noexcept {
+ if (Y_LIKELY(this != &rhs)) {
+ if (LockId) {
+ Unregister(LockId, ActorSystem);
+ }
+ LockId = rhs.LockId;
+ ActorSystem = rhs.ActorSystem;
+ rhs.LockId = 0;
+ rhs.ActorSystem = nullptr;
+ }
+ return *this;
+ }
+
+ ui64 GetLockId() const noexcept {
+ return LockId;
+ }
+
+ private:
+ static void Register(ui64 lockId, NActors::TActorSystem* as) noexcept;
+ static void Unregister(ui64 lockId, NActors::TActorSystem* as) noexcept;
+
+ private:
+ ui64 LockId;
+ NActors::TActorSystem* ActorSystem;
+ };
+
+} // namespace NLongTxService
+} // namespace NKikimr