diff options
author | alexvru <alexvru@ydb.tech> | 2023-03-10 10:35:51 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-03-10 10:35:51 +0300 |
commit | 1287dc8c20edceeff5805907677892e62da2a68a (patch) | |
tree | 8b16accf2a66a00701b8ff5248ab2e7088d0d686 /library/cpp/actors/interconnect/interconnect_handshake.cpp | |
parent | 59c758d9fa32f1dc467824707fb8cdf91e8bb731 (diff) | |
download | ydb-1287dc8c20edceeff5805907677892e62da2a68a.tar.gz |
Improve coroutine actor interface
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_handshake.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 9926e8fa1f..467c0fd16e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -96,13 +96,13 @@ namespace NActors { bool ResolveTimedOut = false; THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); - TInstant Deadline; + TMonotonic Deadline; TActorId HandshakeBroker; public: THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) - : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors + : TActorCoroImpl(StackSize, true) , Common(std::move(common)) , SelfVirtualId(self) , PeerVirtualId(peer) @@ -119,7 +119,7 @@ namespace NActors { } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) - : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors + : TActorCoroImpl(StackSize, true) , Common(std::move(common)) , Socket(std::move(socket)) , HandshakeKind("incoming handshake") @@ -169,7 +169,7 @@ namespace NActors { timeout *= 0.9; } - Deadline = Now() + timeout; + Deadline = TActivationContext::Monotonic() + timeout; Schedule(Deadline, new TEvents::TEvWakeup); try { @@ -263,6 +263,9 @@ namespace NActors { case TEvPollerReady::EventType: break; + case TEvents::TSystem::Poison: + throw TStopCoroutineException(); + default: Y_FAIL("unexpected event 0x%08" PRIx32, type); } @@ -835,13 +838,13 @@ namespace NActors { } template <typename TEvent> - THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) { + THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline); } template <typename T1, typename T2, typename... TEvents> - THolder<IEventHandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) { + THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline); } @@ -895,11 +898,12 @@ namespace NActors { void Connect(bool updatePeerAddr) { // issue request to a nameservice to resolve peer node address - Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, Deadline)); + const auto mono = TActivationContext::Monotonic(); + Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono))); // wait for the result auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode", - Now() + ResolveTimeout); + mono + ResolveTimeout); // extract address from the result std::vector<NInterconnect::TAddress> addresses; @@ -1049,7 +1053,8 @@ namespace NActors { THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() { Y_VERIFY(PeerNodeId); - Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, Deadline)); + Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, TActivationContext::Now() + + (Deadline - TActivationContext::Monotonic()))); auto response = WaitForSpecificEvent<TEvInterconnect::TEvNodeInfo>("GetPeerNodeInfo"); return std::move(response->Get()->Node); } |