diff options
author | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-20 21:03:07 +0300 |
---|---|---|
committer | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-20 21:03:07 +0300 |
commit | 33c65a59cda3a972e9d670f1bc81b864fc263acf (patch) | |
tree | 3c5a49ef91da3418f484e6c4614a0ccb504ba37c | |
parent | cfe2e50abfa852c2d69942c4d255a20147e46343 (diff) | |
download | ydb-33c65a59cda3a972e9d670f1bc81b864fc263acf.tar.gz |
Lock tracking, KIKIMR-14732
ref:ae27b8de7d19e41dd50e93975b2bdc624f0cf577
-rw-r--r-- | ydb/core/protos/long_tx_service.proto | 23 | ||||
-rw-r--r-- | ydb/core/protos/out/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/protos/out/out_long_tx_service.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/long_tx_service_impl.cpp | 535 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/long_tx_service_impl.h | 135 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/long_tx_service_ut.cpp | 127 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/public/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/public/events.h | 61 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/public/lock_handle.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/public/lock_handle.h | 73 |
10 files changed, 961 insertions, 24 deletions
diff --git a/ydb/core/protos/long_tx_service.proto b/ydb/core/protos/long_tx_service.proto index 8d11cc2e0d..ddfab47aa8 100644 --- a/ydb/core/protos/long_tx_service.proto +++ b/ydb/core/protos/long_tx_service.proto @@ -74,3 +74,26 @@ message TEvAcquireReadSnapshotResult { optional uint64 SnapshotTxId = 4; optional string DatabaseName = 5; } + +message TEvSubscribeLock { + optional uint64 LockId = 1; + optional uint32 LockNode = 2; +} + +message TEvSubscribeLockResult { + enum EResult { + RESULT_UNSPECIFIED = 0; + RESULT_LOCK_SUBSCRIBED = 1; + RESULT_LOCK_NOT_FOUND = 2; + RESULT_LOCK_UNAVAILABLE = 3; + } + + optional uint64 LockId = 1; + optional uint32 LockNode = 2; + optional EResult Result = 3; +} + +message TEvUnsubscribeLock { + optional uint64 LockId = 1; + optional uint32 LockNode = 2; +} diff --git a/ydb/core/protos/out/CMakeLists.txt b/ydb/core/protos/out/CMakeLists.txt index 45dfede0cc..ce490450ae 100644 --- a/ydb/core/protos/out/CMakeLists.txt +++ b/ydb/core/protos/out/CMakeLists.txt @@ -15,5 +15,6 @@ target_link_libraries(core-protos-out PUBLIC ) target_sources(core-protos-out PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/out/out.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/protos/out/out_long_tx_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/protos/out/out_sequenceshard.cpp ) diff --git a/ydb/core/protos/out/out_long_tx_service.cpp b/ydb/core/protos/out/out_long_tx_service.cpp new file mode 100644 index 0000000000..ee57e0c411 --- /dev/null +++ b/ydb/core/protos/out/out_long_tx_service.cpp @@ -0,0 +1,7 @@ +#include <ydb/core/protos/long_tx_service.pb.h> + +#include <util/stream/output.h> + +Y_DECLARE_OUT_SPEC(, NKikimrLongTxService::TEvSubscribeLockResult::EResult, stream, value) { + stream << NKikimrLongTxService::TEvSubscribeLockResult::EResult_Name(value); +} diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp index 3e31da2be1..e4f4816d67 100644 --- a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp +++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp @@ -15,13 +15,69 @@ namespace NLongTxService { static constexpr size_t MaxAcquireSnapshotInFlight = 4; static constexpr TDuration AcquireSnapshotBatchDelay = TDuration::MicroSeconds(500); +static constexpr TDuration RemoteLockTimeout = TDuration::Seconds(15); +static constexpr bool InterconnectUndeliveryBroken = true; void TLongTxServiceActor::Bootstrap() { LogPrefix = TStringBuilder() << "TLongTxService [Node " << SelfId().NodeId() << "] "; Become(&TThis::StateWork); } +void TLongTxServiceActor::TSessionSubscribeActor::Subscribe(const TActorId& sessionId) { + Send(sessionId, new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery); +} + +void TLongTxServiceActor::TSessionSubscribeActor::Handle(TEvInterconnect::TEvNodeConnected::TPtr&) { + // nothing +} + +void TLongTxServiceActor::TSessionSubscribeActor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + if (Self) { + Self->OnSessionDisconnected(ev->Sender); + } +} + +void TLongTxServiceActor::TSessionSubscribeActor::Handle(TEvents::TEvUndelivered::TPtr& ev) { + if (Self) { + Self->OnSessionDisconnected(ev->Sender); + } +} + +TLongTxServiceActor::TSessionState& TLongTxServiceActor::SubscribeToSession(const TActorId& sessionId) { + auto it = Sessions.find(sessionId); + if (it != Sessions.end()) { + return it->second; + } + if (!SessionSubscribeActor) { + SessionSubscribeActor = new TSessionSubscribeActor(this); + RegisterWithSameMailbox(SessionSubscribeActor); + } + SessionSubscribeActor->Subscribe(sessionId); + return Sessions[sessionId]; +} + +void TLongTxServiceActor::OnSessionDisconnected(const TActorId& sessionId) { + auto itSession = Sessions.find(sessionId); + if (itSession == Sessions.end()) { + return; + } + auto& session = itSession->second; + for (ui64 lockId : session.SubscribedLocks) { + auto itLock = Locks.find(lockId); + if (itLock != Locks.end()) { + itLock->second.RemoteSubscribers.erase(sessionId); + } + } + session.SubscribedLocks.clear(); + Sessions.erase(itSession); +} + void TLongTxServiceActor::HandlePoison() { + if (SessionSubscribeActor) { + SessionSubscribeActor->PassAway(); + SessionSubscribeActor->Self = nullptr; + SessionSubscribeActor = nullptr; + } PassAway(); } @@ -386,6 +442,289 @@ void TLongTxServiceActor::Handle(TEvPrivate::TEvAcquireSnapshotFinished::TPtr& e } } +void TLongTxServiceActor::Handle(TEvLongTxService::TEvRegisterLock::TPtr& ev) { + auto* msg = ev->Get(); + ui64 lockId = msg->LockId; + TXLOG_DEBUG("Received TEvRegisterLock for LockId# " << lockId); + + Y_VERIFY(lockId, "Unexpected registration of a zero LockId"); + + auto& lock = Locks[lockId]; + ++lock.RefCount; +} + +void TLongTxServiceActor::Handle(TEvLongTxService::TEvUnregisterLock::TPtr& ev) { + auto* msg = ev->Get(); + ui64 lockId = msg->LockId; + TXLOG_DEBUG("Received TEvUnregisterLock for LockId# " << lockId); + + auto it = Locks.find(lockId); + if (it == Locks.end()) { + return; + } + + auto& lock = it->second; + Y_VERIFY(lock.RefCount > 0); + if (0 == --lock.RefCount) { + for (auto& pr : lock.LocalSubscribers) { + Send(pr.first, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, SelfId().NodeId(), + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND), + 0, pr.second); + } + for (auto& prSession : lock.RemoteSubscribers) { + TActorId sessionId = prSession.first; + for (const auto& pr : prSession.second) { + SendViaSession( + sessionId, pr.first, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, SelfId().NodeId(), + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND), + 0, pr.second); + } + auto itSession = Sessions.find(sessionId); + if (itSession != Sessions.end()) { + itSession->second.SubscribedLocks.erase(lockId); + } + } + Locks.erase(it); + } +} + +void TLongTxServiceActor::Handle(TEvLongTxService::TEvSubscribeLock::TPtr& ev) { + auto& record = ev->Get()->Record; + ui64 lockId = record.GetLockId(); + ui32 lockNode = record.GetLockNode(); + TXLOG_DEBUG("Received TEvSubscribeLock from " << ev->Sender << " for LockId# " << lockId << " LockNode# " << lockNode); + + if (!lockId) { + SendViaSession( + ev->InterconnectSession, ev->Sender, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, lockNode, + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE), + 0, ev->Cookie); + return; + } + + // For remote locks we start a proxy subscription + if (lockNode != SelfId().NodeId()) { + auto& node = ConnectProxyNode(lockNode); + if (node.State == EProxyState::Disconnected) { + // Looks like there's no proxy for this node + Send(ev->Sender, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, lockNode, + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE), + 0, ev->Cookie); + return; + } + + auto& lock = node.Locks[lockId]; + if (lock.LockId == 0) { + lock.LockId = lockId; + } + + if (lock.State == EProxyLockState::Subscribed) { + Send(ev->Sender, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, lockNode, + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED), + 0, ev->Cookie); + lock.RepliedSubscribers[ev->Sender] = ev->Cookie; + return; + } + + lock.NewSubscribers[ev->Sender] = ev->Cookie; + lock.RepliedSubscribers.erase(ev->Sender); + + // Send subscription request immediately if node is already connected + if (node.State == EProxyState::Connected && lock.Cookie == 0) { + lock.Cookie = ++LastCookie; + node.CookieToLock[lock.Cookie] = lockId; + SendViaSession( + node.Session, MakeLongTxServiceID(lockNode), + new TEvLongTxService::TEvSubscribeLock(lockId, lockNode), + IEventHandle::FlagTrackDelivery, lock.Cookie); + } + + // Otherwise we wait until the lock is subscribed + return; + } + + auto it = Locks.find(lockId); + if (it == Locks.end()) { + SendViaSession( + ev->InterconnectSession, ev->Sender, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, lockNode, + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND), + 0, ev->Cookie); + return; + } + + auto& lock = it->second; + if (ev->InterconnectSession) { + auto& session = SubscribeToSession(ev->InterconnectSession); + session.SubscribedLocks.insert(lockId); + lock.RemoteSubscribers[ev->InterconnectSession][ev->Sender] = ev->Cookie; + } else { + lock.LocalSubscribers[ev->Sender] = ev->Cookie; + } + + SendViaSession( + ev->InterconnectSession, ev->Sender, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, lockNode, + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED), + 0, ev->Cookie); +} + +void TLongTxServiceActor::Handle(TEvLongTxService::TEvSubscribeLockResult::TPtr& ev) { + auto& record = ev->Get()->Record; + ui64 lockId = record.GetLockId(); + ui32 lockNode = record.GetLockNode(); + auto lockStatus = record.GetResult(); + TXLOG_DEBUG("Received TEvSubscribeLockResult from " << ev->Sender + << " for LockId# " << lockId << " LockNode# " << lockNode + << " LockStatus# " << lockStatus); + + auto* node = ProxyNodes.FindPtr(lockNode); + if (!node || node->State != EProxyState::Connected) { + // Ignore replies from unexpected nodes + return; + } + + if (ev->InterconnectSession != node->Session) { + // Ignore replies that arrived via unexpected sessions + return; + } + + auto itLock = node->Locks.find(lockId); + if (itLock == node->Locks.end()) { + // Ignore replies for locks without subscriptions + return; + } + + auto& lock = itLock->second; + + if (lock.Cookie != ev->Cookie) { + // Ignore replies that don't have a matching cookie + return; + } + + // Make sure lock is removed from expire queue + if (node->LockExpireQueue.Has(&lock)) { + node->LockExpireQueue.Remove(&lock); + } + + // Special handling for successful lock subscriptions + if (lockStatus == NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED) { + lock.State = EProxyLockState::Subscribed; + for (auto& pr : lock.NewSubscribers) { + Send(pr.first, + new TEvLongTxService::TEvSubscribeLockResult(lockId, lockNode, lockStatus), + 0, pr.second); + lock.RepliedSubscribers[pr.first] = pr.second; + } + lock.NewSubscribers.clear(); + return; + } + + // Treat any other status as a confirmed error, reply to all and remove the lock + + for (auto& pr : lock.RepliedSubscribers) { + Send(pr.first, + new TEvLongTxService::TEvSubscribeLockResult(lockId, lockNode, lockStatus), + 0, pr.second); + } + + for (auto& pr : lock.NewSubscribers) { + Send(pr.first, + new TEvLongTxService::TEvSubscribeLockResult(lockId, lockNode, lockStatus), + 0, pr.second); + } + + node->CookieToLock.erase(lock.Cookie); + node->Locks.erase(itLock); +} + +void TLongTxServiceActor::Handle(TEvLongTxService::TEvUnsubscribeLock::TPtr& ev) { + auto& record = ev->Get()->Record; + ui64 lockId = record.GetLockId(); + ui32 lockNode = record.GetLockNode(); + TXLOG_DEBUG("Received TEvUnsubscribeLock from " << ev->Sender << " for LockId# " << lockId << " LockNode# " << lockNode); + + if (!lockId) { + return; + } + + if (lockNode != SelfId().NodeId()) { + auto* node = ProxyNodes.FindPtr(lockNode); + if (!node) { + return; + } + + auto itLock = node->Locks.find(lockId); + if (itLock == node->Locks.end()) { + return; + } + + auto& lock = itLock->second; + lock.NewSubscribers.erase(ev->Sender); + lock.RepliedSubscribers.erase(ev->Sender); + + if (lock.Empty()) { + // We don't need this lock anymore, unsubscribe if the node is already connected + if (node->State == EProxyState::Connected) { + SendViaSession( + node->Session, MakeLongTxServiceID(lockNode), + new TEvLongTxService::TEvUnsubscribeLock(lockId, lockNode)); + } + if (node->LockExpireQueue.Has(&lock)) { + node->LockExpireQueue.Remove(&lock); + } + node->CookieToLock.erase(lock.Cookie); + node->Locks.erase(itLock); + } + + return; + } + + auto it = Locks.find(lockId); + if (it == Locks.end()) { + return; + } + + auto& lock = it->second; + if (ev->InterconnectSession) { + auto itSubscribers = lock.RemoteSubscribers.find(ev->InterconnectSession); + if (itSubscribers != lock.RemoteSubscribers.end()) { + itSubscribers->second.erase(ev->Sender); + if (itSubscribers->second.empty()) { + lock.RemoteSubscribers.erase(itSubscribers); + auto itSession = Sessions.find(ev->InterconnectSession); + if (itSession != Sessions.end()) { + itSession->second.SubscribedLocks.erase(lockId); + } + } + } + } else { + lock.LocalSubscribers.erase(ev->Sender); + } +} + +void TLongTxServiceActor::SendViaSession(const TActorId& sessionId, const TActorId& recipient, + IEventBase* event, ui32 flags, ui64 cookie) +{ + auto ev = MakeHolder<IEventHandle>(recipient, SelfId(), event, flags, cookie); + if (sessionId) { + ev->Rewrite(TEvInterconnect::EvForward, sessionId); + } + TActivationContext::Send(ev.Release()); +} + void TLongTxServiceActor::SendReply(ERequestType type, TActorId sender, ui64 cookie, Ydb::StatusIds::StatusCode status, TStringBuf details) { @@ -414,15 +753,25 @@ void TLongTxServiceActor::SendReplyUnavailable(ERequestType type, TActorId sende SendReply(type, sender, cookie, Ydb::StatusIds::UNAVAILABLE, details); } -void TLongTxServiceActor::SendProxyRequest(ui32 nodeId, ERequestType type, THolder<IEventHandle> ev) { +TLongTxServiceActor::TProxyNodeState& TLongTxServiceActor::ConnectProxyNode(ui32 nodeId) { auto& node = ProxyNodes[nodeId]; - if (node.State == EProxyState::Unknown) { - auto proxy = TActivationContext::InterconnectProxy(nodeId); - if (!proxy) { - return SendReplyUnavailable(type, ev->Sender, ev->Cookie, "Cannot forward request: node unknown"); + if (node.NodeId == 0) { + node.NodeId = nodeId; + } + if (node.State == EProxyState::Disconnected) { + // Node will be left in Disconnected state if there's no proxy + if (auto proxy = TActivationContext::InterconnectProxy(nodeId)) { + Send(proxy, new TEvInterconnect::TEvConnectNode(), IEventHandle::FlagTrackDelivery, nodeId); + node.State = EProxyState::Connecting; } - Send(proxy, new TEvInterconnect::TEvConnectNode(), IEventHandle::FlagTrackDelivery, nodeId); - node.State = EProxyState::Connecting; + } + return node; +} + +void TLongTxServiceActor::SendProxyRequest(ui32 nodeId, ERequestType type, THolder<IEventHandle> ev) { + auto& node = ConnectProxyNode(nodeId); + if (node.State == EProxyState::Disconnected) { + return SendReplyUnavailable(type, ev->Sender, ev->Cookie, "Cannot forward request: node unknown"); } ui64 cookie = ++LastCookie; @@ -460,38 +809,60 @@ void TLongTxServiceActor::SendProxyRequest(ui32 nodeId, ERequestType type, THold } void TLongTxServiceActor::Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) { - auto it = ProxyNodes.find(ev->Get()->NodeId); - if (it == ProxyNodes.end()) { + const ui32 nodeId = ev->Get()->NodeId; + TXLOG_DEBUG("Received TEvNodeConnected for NodeId# " << nodeId << " from session " << ev->Sender); + + auto itNode = ProxyNodes.find(nodeId); + if (itNode == ProxyNodes.end()) { return; } - auto& node = it->second; + + auto& node = itNode->second; if (node.State != EProxyState::Connecting) { return; } + node.State = EProxyState::Connected; node.Session = ev->Sender; + auto pending = std::move(node.Pending); - Y_VERIFY_DEBUG(node.Pending.empty()); + node.Pending.clear(); for (auto& req : pending) { req.Ev->Rewrite(TEvInterconnect::EvForward, node.Session); TActivationContext::Send(req.Ev.Release()); req.Request->State = ERequestState::Sent; } + + // Send subscription requests for all remote locks + for (auto& pr : node.Locks) { + const ui64 lockId = pr.first; + auto& lock = pr.second; + lock.Cookie = ++LastCookie; + node.CookieToLock[lock.Cookie] = lock.Cookie; + SendViaSession( + node.Session, MakeLongTxServiceID(nodeId), + new TEvLongTxService::TEvSubscribeLock(lockId, nodeId), + IEventHandle::FlagTrackDelivery, lock.Cookie); + } } void TLongTxServiceActor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { - OnNodeDisconnected(ev->Get()->NodeId, ev->Sender); + const ui32 nodeId = ev->Get()->NodeId; + TXLOG_DEBUG("Received TEvNodeDisconnected for NodeId# " << nodeId << " from session " << ev->Sender); + OnNodeDisconnected(nodeId, ev->Sender); } void TLongTxServiceActor::OnNodeDisconnected(ui32 nodeId, const TActorId& sender) { - auto it = ProxyNodes.find(nodeId); - if (it == ProxyNodes.end()) { + auto itNode = ProxyNodes.find(nodeId); + if (itNode == ProxyNodes.end()) { return; } - auto& node = it->second; + + auto& node = itNode->second; if (node.Session && node.Session != sender) { return; } + if (node.ActiveRequests) { NYql::TIssues issuesPending; issuesPending.AddIssue("Cannot forward request: node disconnected"); @@ -511,26 +882,116 @@ void TLongTxServiceActor::OnNodeDisconnected(ui32 nodeId, const TActorId& sender } }; - for (auto& pr : node.ActiveRequests) { + auto active = std::move(node.ActiveRequests); + node.ActiveRequests.clear(); + for (auto& pr : active) { auto& req = pr.second; const auto status = isRetriable(req) ? Ydb::StatusIds::UNAVAILABLE : Ydb::StatusIds::UNDETERMINED; const auto& issues = req.State == ERequestState::Pending ? issuesPending : issuesSent; SendReplyIssues(req.Type, req.Sender, req.Cookie, status, issues); } } - ProxyNodes.erase(it); + + if (node.Pending) { + auto pending = std::move(node.Pending); + node.Pending.clear(); + } + + TMonotonic now = TActivationContext::Monotonic(); + + for (auto& pr : node.Locks) { + auto& lock = pr.second; + // For each lock remember the first time it became unavailable + if (lock.State != EProxyLockState::Unavailable) { + lock.State = EProxyLockState::Unavailable; + lock.ExpiresAt = now + RemoteLockTimeout; + node.LockExpireQueue.Add(&lock); + } + // Forget subscribe requests that have been in-flight + if (lock.Cookie != 0) { + node.CookieToLock.erase(lock.Cookie); + lock.Cookie = 0; + } + } + + // See which locks are unavailable for more than timeout + while (auto* lock = node.LockExpireQueue.Top()) { + if (now < lock->ExpiresAt) { + break; + } + RemoveUnavailableLock(node, *lock); + } + + if (node.Locks.empty()) { + // Remove an unnecessary node structure + ProxyNodes.erase(itNode); + return; + } + + node.State = EProxyState::Disconnected; + node.Session = {}; + + // TODO: faster retries with exponential backoff? + Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvReconnect(nodeId)); +} + +void TLongTxServiceActor::Handle(TEvPrivate::TEvReconnect::TPtr& ev) { + ui32 nodeId = ev->Get()->NodeId; + + auto& node = ConnectProxyNode(nodeId); + if (node.State == EProxyState::Disconnected) { + // TODO: handle proxy disappearing from interconnect? + } } void TLongTxServiceActor::Handle(TEvents::TEvUndelivered::TPtr& ev) { - if (ev->Get()->SourceType == TEvInterconnect::EvConnectNode) { + auto* msg = ev->Get(); + TXLOG_DEBUG("Received TEvUndelivered from " << ev->Sender << " cookie " << ev->Cookie + << " type " << msg->SourceType + << " reason " << msg->Reason + << " session " << ev->InterconnectSession); + + if (msg->SourceType == TEvInterconnect::EvConnectNode) { return OnNodeDisconnected(ev->Cookie, ev->Sender); } + + // InterconnectSession will be set if we received this notification from + // a remote node, as opposed to a local notification when message is not + // delivered to interconnect session itself. Session problems are handled + // by separate disconnect notifications. + // FIXME: currently interconnect mock is broken, so we assume all other + // undelivery notifications are coming over the network. It should be + // safe to do, since each request uses a unique cookie that is only valid + // while session is connected. + if (!ev->InterconnectSession && !InterconnectUndeliveryBroken) { + return; + } + auto nodeId = ev->Sender.NodeId(); - auto it = ProxyNodes.find(nodeId); - if (it == ProxyNodes.end()) { + auto itNode = ProxyNodes.find(nodeId); + if (itNode == ProxyNodes.end()) { return; } - auto& node = it->second; + + auto& node = itNode->second; + + if (msg->SourceType == TEvLongTxService::EvSubscribeLock) { + auto itCookie = node.CookieToLock.find(ev->Cookie); + if (itCookie == node.CookieToLock.end()) { + return; + } + + ui64 lockId = itCookie->second; + auto itLock = node.Locks.find(lockId); + if (itLock == node.Locks.end()) { + return; + } + + auto& lock = itLock->second; + RemoveUnavailableLock(node, lock); + return; + } + auto itReq = node.ActiveRequests.find(ev->Cookie); if (itReq == node.ActiveRequests.end()) { return; @@ -540,5 +1001,37 @@ void TLongTxServiceActor::Handle(TEvents::TEvUndelivered::TPtr& ev) { node.ActiveRequests.erase(itReq); } +void TLongTxServiceActor::RemoveUnavailableLock(TProxyNodeState& node, TProxyLockState& lock) { + const ui32 nodeId = node.NodeId; + const ui64 lockId = lock.LockId; + + for (auto& pr : lock.RepliedSubscribers) { + Send(pr.first, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, nodeId, + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE), + 0, pr.second); + } + + for (auto& pr : lock.NewSubscribers) { + Send(pr.first, + new TEvLongTxService::TEvSubscribeLockResult( + lockId, nodeId, + NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE), + 0, pr.second); + } + + if (node.LockExpireQueue.Has(&lock)) { + node.LockExpireQueue.Remove(&lock); + } + + if (lock.Cookie != 0) { + node.CookieToLock.erase(lock.Cookie); + lock.Cookie = 0; + } + + node.Locks.erase(lockId); +} + } // namespace NLongTxService } // namespace NKikimr diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.h b/ydb/core/tx/long_tx_service/long_tx_service_impl.h index a72ddb185e..9ce68680a8 100644 --- a/ydb/core/tx/long_tx_service/long_tx_service_impl.h +++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.h @@ -2,6 +2,7 @@ #include "long_tx_service.h" #include <ydb/core/tx/long_tx_service/public/events.h> +#include <ydb/core/util/intrusive_heap.h> #include <ydb/core/util/ulid.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -51,8 +52,14 @@ namespace NLongTxService { Sent, }; - enum class EProxyState { + enum class EProxyLockState { Unknown, + Subscribed, + Unavailable, + }; + + enum class EProxyState { + Disconnected, Connecting, Connected, }; @@ -69,14 +76,46 @@ namespace NLongTxService { TProxyRequestState* Request = nullptr; }; + struct TProxyLockState { + EProxyLockState State = EProxyLockState::Unknown; + ui64 LockId = 0; + ui64 Cookie = 0; + THashMap<TActorId, ui64> NewSubscribers; + THashMap<TActorId, ui64> RepliedSubscribers; + // Intrusive heap support + size_t HeapIndex = -1; + TMonotonic ExpiresAt; + + bool Empty() const { + return NewSubscribers.empty() && RepliedSubscribers.empty(); + } + + struct THeapIndex { + size_t& operator()(TProxyLockState& value) const { + return value.HeapIndex; + } + }; + + struct THeapLess { + bool operator()(const TProxyLockState& a, const TProxyLockState& b) const { + return a.ExpiresAt < b.ExpiresAt; + } + }; + }; + struct TProxyNodeState { - EProxyState State = EProxyState::Unknown; + EProxyState State = EProxyState::Disconnected; + ui32 NodeId = 0; // Currently connected interconnect session TActorId Session; // Cookie to an active request THashMap<ui64, TProxyRequestState> ActiveRequests; // Pending events, waiting for the node to become connected TVector<TProxyPendingRequest> Pending; + // Locks requested by local subscribers + THashMap<ui64, TProxyLockState> Locks; + TIntrusiveHeap<TProxyLockState, TProxyLockState::THeapIndex, TProxyLockState::THeapLess> LockExpireQueue; + THashMap<ui64, ui64> CookieToLock; }; struct TAcquireSnapshotUserRequest { @@ -103,12 +142,24 @@ namespace NLongTxService { bool FlushPending = false; }; + struct TLockState { + ui64 RefCount = 0; + + THashMap<TActorId, ui64> LocalSubscribers; + THashMap<TActorId, THashMap<TActorId, ui64>> RemoteSubscribers; + }; + + struct TSessionState { + THashSet<ui64> SubscribedLocks; + }; + private: struct TEvPrivate { enum EEv { EvCommitFinished = EventSpaceBegin(TEvents::ES_PRIVATE), EvAcquireSnapshotFlush, EvAcquireSnapshotFinished, + EvReconnect, }; struct TEvCommitFinished : public TEventLocal<TEvCommitFinished, EvCommitFinished> { @@ -151,6 +202,53 @@ namespace NLongTxService { , Issues(std::move(issues)) { } }; + + struct TEvReconnect : public TEventLocal<TEvReconnect, EvReconnect> { + const ui32 NodeId; + + explicit TEvReconnect(ui32 nodeId) + : NodeId(nodeId) + { } + }; + }; + + private: + class TSessionSubscribeActor : public TActor<TSessionSubscribeActor> { + friend class TLongTxServiceActor; + + public: + TSessionSubscribeActor(TLongTxServiceActor* self) + : TActor(&TThis::StateWork) + , Self(self) + { } + + ~TSessionSubscribeActor() { + if (Self) { + Self->SessionSubscribeActor = nullptr; + Self = nullptr; + } + } + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::LONG_TX_SERVICE; + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + } + } + + void Subscribe(const TActorId& sessionId); + void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev); + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev); + void Handle(TEvents::TEvUndelivered::TPtr& ev); + + private: + TLongTxServiceActor* Self; }; public: @@ -160,6 +258,13 @@ namespace NLongTxService { Y_UNUSED(Settings); // TODO } + ~TLongTxServiceActor() { + if (SessionSubscribeActor) { + SessionSubscribeActor->Self = nullptr; + SessionSubscribeActor = nullptr; + } + } + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::LONG_TX_SERVICE; } @@ -167,6 +272,10 @@ namespace NLongTxService { void Bootstrap(); private: + TSessionState& SubscribeToSession(const TActorId& sessionId); + void OnSessionDisconnected(const TActorId& sessionId); + + private: STFUNC(StateWork) { Y_UNUSED(ctx); switch (ev->GetTypeRewrite()) { @@ -182,8 +291,14 @@ namespace NLongTxService { hFunc(TEvLongTxService::TEvAcquireReadSnapshot, Handle); hFunc(TEvPrivate::TEvAcquireSnapshotFlush, Handle); hFunc(TEvPrivate::TEvAcquireSnapshotFinished, Handle); + hFunc(TEvLongTxService::TEvRegisterLock, Handle); + hFunc(TEvLongTxService::TEvUnregisterLock, Handle); + hFunc(TEvLongTxService::TEvSubscribeLock, Handle); + hFunc(TEvLongTxService::TEvSubscribeLockResult, Handle); + hFunc(TEvLongTxService::TEvUnsubscribeLock, Handle); hFunc(TEvInterconnect::TEvNodeConnected, Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvPrivate::TEvReconnect, Handle); hFunc(TEvents::TEvUndelivered, Handle); } } @@ -200,22 +315,35 @@ namespace NLongTxService { void Handle(TEvLongTxService::TEvAcquireReadSnapshot::TPtr& ev); void Handle(TEvPrivate::TEvAcquireSnapshotFlush::TPtr& ev); void Handle(TEvPrivate::TEvAcquireSnapshotFinished::TPtr& ev); + void Handle(TEvLongTxService::TEvRegisterLock::TPtr& ev); + void Handle(TEvLongTxService::TEvUnregisterLock::TPtr& ev); + void Handle(TEvLongTxService::TEvSubscribeLock::TPtr& ev); + void Handle(TEvLongTxService::TEvSubscribeLockResult::TPtr& ev); + void Handle(TEvLongTxService::TEvUnsubscribeLock::TPtr& ev); private: + void SendViaSession(const TActorId& sessionId, const TActorId& recipient, + IEventBase* event, ui32 flags = 0, ui64 cookie = 0); + void SendReply(ERequestType type, TActorId sender, ui64 cookie, Ydb::StatusIds::StatusCode status, TStringBuf details); void SendReplyIssues(ERequestType type, TActorId sender, ui64 cookie, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); void SendReplyUnavailable(ERequestType type, TActorId sender, ui64 cookie, TStringBuf details); + TProxyNodeState& ConnectProxyNode(ui32 nodeId); void SendProxyRequest(ui32 nodeId, ERequestType type, THolder<IEventHandle> ev); void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev); void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void OnNodeDisconnected(ui32 nodeId, const TActorId& sender); + void Handle(TEvPrivate::TEvReconnect::TPtr& ev); void Handle(TEvents::TEvUndelivered::TPtr& ev); private: + void RemoveUnavailableLock(TProxyNodeState& node, TProxyLockState& lock); + + private: void StartCommitActor(TTransaction& tx); void ScheduleAcquireSnapshot(const TString& databaseName, TDatabaseSnapshotState& state); void StartAcquireSnapshotActor(const TString& databaseName, TDatabaseSnapshotState& state); @@ -226,12 +354,15 @@ namespace NLongTxService { private: const TLongTxServiceSettings Settings; TString LogPrefix; + TSessionSubscribeActor* SessionSubscribeActor = nullptr; THashMap<TULID, TTransaction> Transactions; TULIDGenerator IdGenerator; THashMap<ui32, TProxyNodeState> ProxyNodes; THashMap<TString, TDatabaseSnapshotState> DatabaseSnapshots; THashMap<ui64, TAcquireSnapshotState> AcquireSnapshotInFlight; TString DefaultDatabaseName; + THashMap<ui64, TLockState> Locks; + THashMap<TActorId, TSessionState> Sessions; ui64 LastCookie = 0; }; diff --git a/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp b/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp index 1a2375e15f..8504978778 100644 --- a/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp +++ b/ydb/core/tx/long_tx_service/long_tx_service_ut.cpp @@ -1,6 +1,7 @@ #include "long_tx_service.h" #include <ydb/core/tx/long_tx_service/public/events.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/tx/scheme_board/cache.h> #include <ydb/core/testlib/tablet_helpers.h> @@ -170,7 +171,7 @@ Y_UNIT_TEST_SUITE(LongTxService) { // Change txId to a non-existant node and try to commit { auto badTxId = txId; - badTxId.NodeId = 3; + badTxId.NodeId = runtime.GetNodeId(1) + 1; runtime.Send( new IEventHandle(service2, sender2, new TEvLongTxService::TEvCommitTx(badTxId)), @@ -234,6 +235,130 @@ Y_UNIT_TEST_SUITE(LongTxService) { } } + Y_UNIT_TEST(LockSubscribe) { + TTenantTestRuntime runtime(MakeTenantTestConfig(true)); + runtime.SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NLog::PRI_DEBUG); + + TLockHandle handle(123, runtime.GetActorSystem(0)); + + auto node1 = runtime.GetNodeId(0); + auto sender1 = runtime.AllocateEdgeActor(0); + auto service1 = MakeLongTxServiceID(node1); + auto node2 = runtime.GetNodeId(1); + auto sender2 = runtime.AllocateEdgeActor(1); + auto service2 = MakeLongTxServiceID(node2); + + { + runtime.Send( + new IEventHandle(service1, sender1, + new TEvLongTxService::TEvSubscribeLock(987, node1)), + 0, true); + auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender1); + const auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 987u); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND); + } + + { + runtime.Send( + new IEventHandle(service2, sender2, + new TEvLongTxService::TEvSubscribeLock(987, node1)), + 1, true); + auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2); + const auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 987u); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(),node1); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND); + } + + { + runtime.Send( + new IEventHandle(service1, sender1, + new TEvLongTxService::TEvSubscribeLock(123, node1)), + 0, true); + auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender1); + const auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED); + } + + { + runtime.Send( + new IEventHandle(service2, sender2, + new TEvLongTxService::TEvSubscribeLock(123, node1)), + 1, true); + auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2); + const auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_SUBSCRIBED); + } + + { + // move lock handle out, so it unregisters itself + auto movedOut = std::move(handle); + } + + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender1); + const auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND); + } + + { + auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2); + const auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 123u); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_NOT_FOUND); + } + + // Block all cross-node TEvSubscribeLock messages and disconnect instead + size_t disconnectCount = 0; + auto observer = [&](auto& runtime, auto& ev) { + switch (ev->GetTypeRewrite()) { + case TEvLongTxService::TEvSubscribeLock::EventType: { + ui32 node1 = ev->Sender.NodeId(); + ui32 node2 = ev->Recipient.NodeId(); + if (node1 != node2) { + ++disconnectCount; + auto proxy = runtime.GetInterconnectProxy(0, 1); + runtime.Send( + new IEventHandle(proxy, {}, new TEvInterconnect::TEvDisconnect()), + 0, true); + // Advance time on each disconnect, so timeout happens faster + runtime.AdvanceCurrentTime(TDuration::Seconds(5)); + return TTestBasicRuntime::EEventAction::DROP; + } + break; + } + } + return TTestBasicRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(observer); + + // Try to subscribe to the lock of a disconnecting node + // We should eventually get an unavailable result + { + runtime.Send( + new IEventHandle(service2, sender2, + new TEvLongTxService::TEvSubscribeLock(234, node1)), + 1, true); + auto ev = runtime.GrabEdgeEventRethrow<TEvLongTxService::TEvSubscribeLockResult>(sender2); + const auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockId(), 234u); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetLockNode(), node1); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetResult(), NKikimrLongTxService::TEvSubscribeLockResult::RESULT_LOCK_UNAVAILABLE); + } + + // We expect multiple disconnects before unavailable result is returned + UNIT_ASSERT_GE(disconnectCount, 3); + } + } // Y_UNIT_TEST_SUITE(LongTxService) } // namespace NLongTxService diff --git a/ydb/core/tx/long_tx_service/public/CMakeLists.txt b/ydb/core/tx/long_tx_service/public/CMakeLists.txt index 94727e7b74..f5fe906228 100644 --- a/ydb/core/tx/long_tx_service/public/CMakeLists.txt +++ b/ydb/core/tx/long_tx_service/public/CMakeLists.txt @@ -23,5 +23,6 @@ target_link_libraries(tx-long_tx_service-public PUBLIC ) target_sources(tx-long_tx_service-public PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/long_tx_service/public/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/long_tx_service/public/lock_handle.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/long_tx_service/public/types.cpp ) diff --git a/ydb/core/tx/long_tx_service/public/events.h b/ydb/core/tx/long_tx_service/public/events.h index e053c1c857..15612123ac 100644 --- a/ydb/core/tx/long_tx_service/public/events.h +++ b/ydb/core/tx/long_tx_service/public/events.h @@ -25,6 +25,11 @@ namespace NLongTxService { EvAttachColumnShardWritesResult, EvAcquireReadSnapshot, EvAcquireReadSnapshotResult, + EvRegisterLock, + EvUnregisterLock, + EvSubscribeLock, + EvSubscribeLockResult, + EvUnsubscribeLock, EvEnd, }; @@ -187,6 +192,62 @@ namespace NLongTxService { } } }; + + struct TEvRegisterLock + : TEventLocal<TEvRegisterLock, EvRegisterLock> + { + const ui64 LockId; + + explicit TEvRegisterLock(ui64 lockId) + : LockId(lockId) + { } + }; + + struct TEvUnregisterLock + : TEventLocal<TEvUnregisterLock, EvUnregisterLock> + { + const ui64 LockId; + + explicit TEvUnregisterLock(ui64 lockId) + : LockId(lockId) + { } + }; + + struct TEvSubscribeLock + : TEventPB<TEvSubscribeLock, NKikimrLongTxService::TEvSubscribeLock, EvSubscribeLock> + { + TEvSubscribeLock() = default; + + TEvSubscribeLock(ui64 lockId, ui32 lockNode) { + Record.SetLockId(lockId); + Record.SetLockNode(lockNode); + } + }; + + struct TEvSubscribeLockResult + : TEventPB<TEvSubscribeLockResult, NKikimrLongTxService::TEvSubscribeLockResult, EvSubscribeLockResult> + { + using EResult = NKikimrLongTxService::TEvSubscribeLockResult::EResult; + + TEvSubscribeLockResult() = default; + + TEvSubscribeLockResult(ui64 lockId, ui32 lockNode, EResult result) { + Record.SetLockId(lockId); + Record.SetLockNode(lockNode); + Record.SetResult(result); + } + }; + + struct TEvUnsubscribeLock + : TEventPB<TEvUnsubscribeLock, NKikimrLongTxService::TEvUnsubscribeLock, EvUnsubscribeLock> + { + TEvUnsubscribeLock() = default; + + TEvUnsubscribeLock(ui64 lockId, ui32 lockNode) { + Record.SetLockId(lockId); + Record.SetLockNode(lockNode); + } + }; }; } // namespace NLongTxService diff --git a/ydb/core/tx/long_tx_service/public/lock_handle.cpp b/ydb/core/tx/long_tx_service/public/lock_handle.cpp new file mode 100644 index 0000000000..875c387f24 --- /dev/null +++ b/ydb/core/tx/long_tx_service/public/lock_handle.cpp @@ -0,0 +1,22 @@ +#include "lock_handle.h" +#include "events.h" + +#include <library/cpp/actors/core/actorsystem.h> + +namespace NKikimr { +namespace NLongTxService { + + void TLockHandle::Register(ui64 lockId, TActorSystem* as) noexcept { + Y_VERIFY(lockId, "Cannot register a zero lock id"); + Y_VERIFY(as, "Cannot register without a valid actor system"); + as->Send(MakeLongTxServiceID(as->NodeId), new TEvLongTxService::TEvRegisterLock(lockId)); + } + + void TLockHandle::Unregister(ui64 lockId, TActorSystem* as) noexcept { + Y_VERIFY(lockId, "Cannot unregister a zero lock id"); + Y_VERIFY(as, "Cannot unregister without a valid actor system"); + as->Send(MakeLongTxServiceID(as->NodeId), new TEvLongTxService::TEvUnregisterLock(lockId)); + } + +} // namespace NLongTxService +} // namespace NKikimr diff --git a/ydb/core/tx/long_tx_service/public/lock_handle.h b/ydb/core/tx/long_tx_service/public/lock_handle.h new file mode 100644 index 0000000000..fa7647d12a --- /dev/null +++ b/ydb/core/tx/long_tx_service/public/lock_handle.h @@ -0,0 +1,73 @@ +#pragma once +#include <util/system/compiler.h> +#include <util/system/types.h> + +namespace NActors { + + // Avoid include dependency by forward declaring TActorSystem + class TActorSystem; + +} // namespace NActors + +namespace NKikimr { +namespace NLongTxService { + + class TLockHandle { + public: + TLockHandle() noexcept + : LockId(0) + , ActorSystem(nullptr) + { } + + TLockHandle(ui64 lockId, NActors::TActorSystem* as) noexcept + : LockId(lockId) + , ActorSystem(as) + { + if (LockId) { + Register(LockId, ActorSystem); + } + } + + TLockHandle(TLockHandle&& rhs) noexcept + : LockId(rhs.LockId) + , ActorSystem(rhs.ActorSystem) + { + rhs.LockId = 0; + rhs.ActorSystem = nullptr; + } + + ~TLockHandle() noexcept { + if (LockId) { + Unregister(LockId, ActorSystem); + LockId = 0; + } + } + + TLockHandle& operator=(TLockHandle&& rhs) noexcept { + if (Y_LIKELY(this != &rhs)) { + if (LockId) { + Unregister(LockId, ActorSystem); + } + LockId = rhs.LockId; + ActorSystem = rhs.ActorSystem; + rhs.LockId = 0; + rhs.ActorSystem = nullptr; + } + return *this; + } + + ui64 GetLockId() const noexcept { + return LockId; + } + + private: + static void Register(ui64 lockId, NActors::TActorSystem* as) noexcept; + static void Unregister(ui64 lockId, NActors::TActorSystem* as) noexcept; + + private: + ui64 LockId; + NActors::TActorSystem* ActorSystem; + }; + +} // namespace NLongTxService +} // namespace NKikimr |