aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_handshake.cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-03-10 10:35:51 +0300
committeralexvru <alexvru@ydb.tech>2023-03-10 10:35:51 +0300
commit1287dc8c20edceeff5805907677892e62da2a68a (patch)
tree8b16accf2a66a00701b8ff5248ab2e7088d0d686 /library/cpp/actors/interconnect/interconnect_handshake.cpp
parent59c758d9fa32f1dc467824707fb8cdf91e8bb731 (diff)
downloadydb-1287dc8c20edceeff5805907677892e62da2a68a.tar.gz
Improve coroutine actor interface
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_handshake.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp23
1 files changed, 14 insertions, 9 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 9926e8fa1f..467c0fd16e 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -96,13 +96,13 @@ namespace NActors {
bool ResolveTimedOut = false;
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
- TInstant Deadline;
+ TMonotonic Deadline;
TActorId HandshakeBroker;
public:
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
- : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
+ : TActorCoroImpl(StackSize, true)
, Common(std::move(common))
, SelfVirtualId(self)
, PeerVirtualId(peer)
@@ -119,7 +119,7 @@ namespace NActors {
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
- : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
+ : TActorCoroImpl(StackSize, true)
, Common(std::move(common))
, Socket(std::move(socket))
, HandshakeKind("incoming handshake")
@@ -169,7 +169,7 @@ namespace NActors {
timeout *= 0.9;
}
- Deadline = Now() + timeout;
+ Deadline = TActivationContext::Monotonic() + timeout;
Schedule(Deadline, new TEvents::TEvWakeup);
try {
@@ -263,6 +263,9 @@ namespace NActors {
case TEvPollerReady::EventType:
break;
+ case TEvents::TSystem::Poison:
+ throw TStopCoroutineException();
+
default:
Y_FAIL("unexpected event 0x%08" PRIx32, type);
}
@@ -835,13 +838,13 @@ namespace NActors {
}
template <typename TEvent>
- THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
+ THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline);
}
template <typename T1, typename T2, typename... TEvents>
- THolder<IEventHandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
+ THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline);
}
@@ -895,11 +898,12 @@ namespace NActors {
void Connect(bool updatePeerAddr) {
// issue request to a nameservice to resolve peer node address
- Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, Deadline));
+ const auto mono = TActivationContext::Monotonic();
+ Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono)));
// wait for the result
auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode",
- Now() + ResolveTimeout);
+ mono + ResolveTimeout);
// extract address from the result
std::vector<NInterconnect::TAddress> addresses;
@@ -1049,7 +1053,8 @@ namespace NActors {
THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() {
Y_VERIFY(PeerNodeId);
- Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, Deadline));
+ Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, TActivationContext::Now() +
+ (Deadline - TActivationContext::Monotonic())));
auto response = WaitForSpecificEvent<TEvInterconnect::TEvNodeInfo>("GetPeerNodeInfo");
return std::move(response->Get()->Node);
}