diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:02 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:02 +0300 |
commit | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch) | |
tree | c2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/actors/interconnect | |
parent | ab3783171cc30e262243a0227c86118f7080c896 (diff) | |
download | ydb-3e0b762a82514bac89c1dd6ea7211e381d8aa248.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect')
34 files changed, 302 insertions, 302 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 8a46ffd535..a7da62c3d7 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -107,29 +107,29 @@ namespace NActors { struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") - TEvHandshakeAsk(const TActorId& self, - const TActorId& peer, + TEvHandshakeAsk(const TActorId& self, + const TActorId& peer, ui64 counter) : Self(self) , Peer(peer) , Counter(counter) { } - const TActorId Self; - const TActorId Peer; + const TActorId Self; + const TActorId Peer; const ui64 Counter; }; struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck") - TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params) + TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params) : Self(self) , NextPacket(nextPacket) , Params(std::move(params)) {} - const TActorId Self; + const TActorId Self; const ui64 NextPacket; const TSessionParams Params; }; @@ -185,8 +185,8 @@ namespace NActors { TEvHandshakeDone( TIntrusivePtr<NInterconnect::TStreamSocket> socket, - const TActorId& peer, - const TActorId& self, + const TActorId& peer, + const TActorId& self, ui64 nextPacket, TAutoPtr<TProgramInfo>&& programInfo, TSessionParams params) @@ -200,8 +200,8 @@ namespace NActors { } TIntrusivePtr<NInterconnect::TStreamSocket> Socket; - const TActorId Peer; - const TActorId Self; + const TActorId Peer; + const TActorId Self; const ui64 NextPacket; TAutoPtr<TProgramInfo> ProgramInfo; const TSessionParams Params; @@ -319,10 +319,10 @@ namespace NActors { template <typename TContainer> TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) { - for (const TActorId& actorId : route) { + for (const TActorId& actorId : route) { auto* hop = Record.AddHops(); if (actorId) { - ActorIdToProto(actorId, hop->MutableNextHop()); + ActorIdToProto(actorId, hop->MutableNextHop()); } } Record.SetId(id); @@ -366,13 +366,13 @@ namespace NActors { }; struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> { - TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize) + TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize) : SessionID(sessionId) , BufferSize(outputBufferSize) { } - TActorId SessionID; + TActorId SessionID; ui64 BufferSize; }; diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h index 225a5243fd..f052a6e92e 100644 --- a/library/cpp/actors/interconnect/interconnect.h +++ b/library/cpp/actors/interconnect/interconnect.h @@ -10,7 +10,7 @@ namespace NActors { TString SelfAddress; ui32 SelfPort; - TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time) + TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time) }; struct TInterconnectProxySetup: public TThrRefBase { @@ -41,12 +41,12 @@ namespace NActors { TIntrusivePtr<TInterconnectGlobalState> GlobalState; - virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls + virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls virtual TActorSetupCmd CreateAcceptor() = 0; }; struct TNameserverSetup { - TActorId ServiceID; + TActorId ServiceID; TIntrusivePtr<TInterconnectGlobalState> GlobalState; }; @@ -118,12 +118,12 @@ namespace NActors { }; struct TNodeRegistrarSetup { - TActorId ServiceID; + TActorId ServiceID; TIntrusivePtr<TInterconnectGlobalState> GlobalState; }; - TActorId GetNameserviceActorId(); + TActorId GetNameserviceActorId(); /** * Const table-lookup based name service diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index e4a0ae3cda..659a6a9e5c 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -1,6 +1,6 @@ #pragma once -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/event_load.h> #include <library/cpp/actors/util/rope.h> diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 285709a00c..81e0694da1 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -3,7 +3,7 @@ #include <library/cpp/actors/core/actorid.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/util/datetime.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/monlib/metrics/metric_registry.h> #include <util/generic/map.h> #include <util/generic/set.h> @@ -63,7 +63,7 @@ namespace NActors { typedef TMap<ui16, TChannelSettings> TChannelsConfig; using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title, - TActorSystem* actorSystem, const TActorId& actorId)>; + TActorSystem* actorSystem, const TActorId& actorId)>; using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>; @@ -71,13 +71,13 @@ namespace NActors { bool orange, bool red, TActorSystem* actorSystem)>; struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> { - TActorId NameserviceId; + TActorId NameserviceId; NMonitoring::TDynamicCounterPtr MonCounters; std::shared_ptr<NMonitoring::IMetricRegistry> Metrics; TChannelsConfig ChannelsConfig; TInterconnectSettings Settings; TRegisterMonPageCallback RegisterMonPage; - TActorId DestructorId; + TActorId DestructorId; std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize; TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024; TString ClusterUUID; diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp index 224160d4b4..e389e93688 100644 --- a/library/cpp/actors/interconnect/interconnect_counters.cpp +++ b/library/cpp/actors/interconnect/interconnect_counters.cpp @@ -619,11 +619,11 @@ namespace { TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read"); for (const char *reason : TDisconnectReason::Reasons) { - DisconnectByReason_[reason] = Metrics_->Rate( - NMonitoring::MakeLabels({ - {"sensor", "interconnect.disconnect_reason"}, - {"reason", reason}, - })); + DisconnectByReason_[reason] = Metrics_->Rate( + NMonitoring::MakeLabels({ + {"sensor", "interconnect.disconnect_reason"}, + {"reason", reason}, + })); } } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 9ede998d8e..51d1e607bc 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -25,8 +25,8 @@ namespace NActors { struct TInitialPacket { struct { - TActorId SelfVirtualId; - TActorId PeerVirtualId; + TActorId SelfVirtualId; + TActorId PeerVirtualId; ui64 NextPacket; ui64 Version; } Header; @@ -34,7 +34,7 @@ namespace NActors { TInitialPacket() = default; - TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) { + TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) { Header.SelfVirtualId = self; Header.PeerVirtualId = peer; Header.NextPacket = nextPacket; @@ -79,8 +79,8 @@ namespace NActors { private: TInterconnectProxyCommon::TPtr Common; - TActorId SelfVirtualId; - TActorId PeerVirtualId; + TActorId SelfVirtualId; + TActorId PeerVirtualId; ui32 PeerNodeId = 0; ui64 NextPacketToPeer = 0; TMaybe<ui64> NextPacketFromPeer; // will be obtained from incoming initial packet @@ -102,7 +102,7 @@ namespace NActors { return IActor::INTERCONNECT_HANDSHAKE; } - THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, + THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors , Common(std::move(common)) @@ -377,7 +377,7 @@ namespace NActors { // set up virtual self id to ensure peer will not drop our connection char buf[12] = {'c', 'o', 'o', 'k', 'i', 'e', ' ', 'c', 'h', 'e', 'c', 'k'}; - SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12)); + SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12)); bool success = true; try { @@ -401,7 +401,7 @@ namespace NActors { request.SetProgramStartTime(0); request.SetSerial(0); request.SetReceiverNodeId(0); - request.SetSenderActorId(TString()); + request.SetSenderActorId(TString()); request.SetCookie(cookie); request.SetDoCheckCookie(true); SendExBlock(request, "SendExBlockDoCheckCookie"); @@ -419,7 +419,7 @@ namespace NActors { } // restore state - SelfVirtualId = TActorId(); + SelfVirtualId = TActorId(); std::swap(tempSocket, Socket); std::swap(tempPollerToken, PollerToken); return success; @@ -455,7 +455,7 @@ namespace NActors { request.SetProgramStartTime(Common->StartTime); request.SetSerial(SelfVirtualId.LocalId()); request.SetReceiverNodeId(PeerNodeId); - request.SetSenderActorId(SelfVirtualId.ToString()); + request.SetSenderActorId(SelfVirtualId.ToString()); request.SetSenderHostName(Common->TechnicalSelfHostName); request.SetReceiverHostName(PeerHostName); @@ -519,7 +519,7 @@ namespace NActors { ValidateClusterUUID(success, generateError); ValidateVersionTag(success, generateError); - const auto& s = success.GetSenderActorId(); + const auto& s = success.GetSenderActorId(); PeerVirtualId.Parse(s.data(), s.size()); // recover flags @@ -599,8 +599,8 @@ namespace NActors { SendInitialPacket(); } else { // peer wants a new session, clear fields and send initial packet - SelfVirtualId = TActorId(); - PeerVirtualId = TActorId(); + SelfVirtualId = TActorId(); + PeerVirtualId = TActorId(); NextPacketToPeer = 0; SendInitialPacket(); @@ -637,7 +637,7 @@ namespace NActors { PeerHostName = request.GetSenderHostName(); // parse peer virtual id - const auto& str = request.GetSenderActorId(); + const auto& str = request.GetSenderActorId(); PeerVirtualId.Parse(str.data(), str.size()); // validate request @@ -709,7 +709,7 @@ namespace NActors { SendExBlock(record, "ExReply"); // extract sender actor id (self virtual id) - const auto& str = success.GetSenderActorId(); + const auto& str = success.GetSenderActorId(); SelfVirtualId.Parse(str.data(), str.size()); } else if (auto ev = reply->CastAsLocal<TEvHandshakeReplyError>()) { // in case of error just send reply to the peer and terminate handshake @@ -981,8 +981,8 @@ namespace NActors { } }; - IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, - const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, + IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, + const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) { return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket, std::move(peerHostName), std::move(params))); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index b3c0db6c5d..7c5c25c3b8 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -15,8 +15,8 @@ namespace NActors { using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; - IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, - const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, + IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, + const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params); IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket); diff --git a/library/cpp/actors/interconnect/interconnect_impl.h b/library/cpp/actors/interconnect/interconnect_impl.h index ee29e4d397..2ca0db8763 100644 --- a/library/cpp/actors/interconnect/interconnect_impl.h +++ b/library/cpp/actors/interconnect/interconnect_impl.h @@ -4,7 +4,7 @@ #include <library/cpp/actors/protos/interconnect.pb.h> #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/helpers/mon_histogram_helper.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> namespace NActors { // resolve node info diff --git a/library/cpp/actors/interconnect/interconnect_mon.cpp b/library/cpp/actors/interconnect/interconnect_mon.cpp index cf924ccbf9..48823c5b0e 100644 --- a/library/cpp/actors/interconnect/interconnect_mon.cpp +++ b/library/cpp/actors/interconnect/interconnect_mon.cpp @@ -1,9 +1,9 @@ #include "interconnect_mon.h" #include "interconnect_tcp_proxy.h" - -#include <library/cpp/json/json_value.h> -#include <library/cpp/json/json_writer.h> -#include <library/cpp/monlib/service/pages/templates.h> + +#include <library/cpp/json/json_value.h> +#include <library/cpp/json/json_writer.h> +#include <library/cpp/monlib/service/pages/templates.h> #include <openssl/ssl.h> #include <openssl/pem.h> @@ -14,7 +14,7 @@ namespace NInterconnect { class TInterconnectMonActor : public TActor<TInterconnectMonActor> { class TQueryProcessor : public TActorBootstrapped<TQueryProcessor> { - const TActorId Sender; + const TActorId Sender; const bool Json; TMap<ui32, TInterconnectProxyTCP::TProxyStats> Stats; ui32 PendingReplies = 0; @@ -24,7 +24,7 @@ namespace NInterconnect { return INTERCONNECT_MONACTOR; } - TQueryProcessor(const TActorId& sender, bool json) + TQueryProcessor(const TActorId& sender, bool json) : Sender(sender) , Json(json) {} diff --git a/library/cpp/actors/interconnect/interconnect_mon.h b/library/cpp/actors/interconnect/interconnect_mon.h index 3fb26053fb..e78229a2c4 100644 --- a/library/cpp/actors/interconnect/interconnect_mon.h +++ b/library/cpp/actors/interconnect/interconnect_mon.h @@ -7,9 +7,9 @@ namespace NInterconnect { NActors::IActor *CreateInterconnectMonActor(TIntrusivePtr<NActors::TInterconnectProxyCommon> common = nullptr); - static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) { + static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) { char s[12] = {'I', 'C', 'O', 'v', 'e', 'r', 'v', 'i', 'e', 'w', 0, 0}; - return NActors::TActorId(nodeId, TStringBuf(s, 12)); + return NActors::TActorId(nodeId, TStringBuf(s, 12)); } } // NInterconnect diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp index 43419bf70d..c9f6f8b5dc 100644 --- a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp +++ b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp @@ -79,8 +79,8 @@ namespace NActors { return true; } - TActorId GetNameserviceActorId() { - return TActorId(0, "namesvc"); + TActorId GetNameserviceActorId() { + return TActorId(0, "namesvc"); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 0abe9fe659..b42ae8dffd 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -6,7 +6,7 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, + TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed, TDuration deadPeerTimeout, TSessionParams params) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7e2d8ccb94..4191951abd 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -3,7 +3,7 @@ #include "interconnect_tcp_session.h" #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/monlib/service/pages/templates.h> #include <util/system/getpid.h> namespace NActors { @@ -45,7 +45,7 @@ namespace NActors { LOG_INFO_IC("ICP01", "ready to work"); } - void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) { + void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) { if (!DynamicPtr) { // perform usual bootstrap for static nodes sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); @@ -311,9 +311,9 @@ namespace NActors { auto event = MakeHolder<TEvHandshakeReplyOK>(); auto* pb = event->Record.MutableSuccess(); - const TActorId virtualId = GenerateSessionVirtualId(); + const TActorId virtualId = GenerateSessionVirtualId(); pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION); - pb->SetSenderActorId(virtualId.ToString()); + pb->SetSenderActorId(virtualId.ToString()); pb->SetProgramPID(GetPID()); pb->SetProgramStartTime(Common->StartTime); pb->SetSerial(virtualId.LocalId()); @@ -536,14 +536,14 @@ namespace NActors { SessionVirtualId.ToString().data()); Session = nullptr; - SessionID = TActorId(); + SessionID = TActorId(); // drop all pending events as we are closed ProcessPendingSessionEvents(); // reset virtual ids as this session is terminated - SessionVirtualId = TActorId(); - RemoteSessionVirtualId = TActorId(); + SessionVirtualId = TActorId(); + RemoteSessionVirtualId = TActorId(); if (Metrics) { Metrics->IncSessionDeaths(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 023e5bd1ee..e5921134ed 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -4,7 +4,7 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/core/events.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include "interconnect_common.h" #include "interconnect_counters.h" @@ -70,7 +70,7 @@ namespace NActors { } void Bootstrap(); - void Registered(TActorSystem* sys, const TActorId& owner) override; + void Registered(TActorSystem* sys, const TActorId& owner) override; private: friend class TInterconnectSessionTCP; @@ -366,7 +366,7 @@ namespace NActors { // read only TInterconnectProxyCommon::TPtr const Common; - const TActorId& GetNameserviceId() const { + const TActorId& GetNameserviceId() const { return Common->NameserviceId; } @@ -403,24 +403,24 @@ namespace NActors { void DropSessionEvent(STATEFN_SIG); TInterconnectSessionTCP* Session = nullptr; - TActorId SessionID; + TActorId SessionID; // virtual ids used during handshake to check if it is the connection // for the same session or to find out the latest shandshake // it's virtual because session actor apears after successfull handshake - TActorId SessionVirtualId; - TActorId RemoteSessionVirtualId; + TActorId SessionVirtualId; + TActorId RemoteSessionVirtualId; - TActorId GenerateSessionVirtualId() { + TActorId GenerateSessionVirtualId() { ICPROXY_PROFILED; const ui64 localId = TlsActivationContext->ExecutorThread.ActorSystem->AllocateIDSpace(1); - return NActors::TActorId(SelfId().NodeId(), 0, localId, 0); + return NActors::TActorId(SelfId().NodeId(), 0, localId, 0); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TActorId IncomingHandshakeActor; + TActorId IncomingHandshakeActor; TInstant IncomingHandshakeActorFilledIn; TInstant IncomingHandshakeActorReset; TMaybe<ui64> LastSerialFromIncomingHandshake; @@ -429,7 +429,7 @@ namespace NActors { void DropIncomingHandshake(bool poison = true) { ICPROXY_PROFILED; - if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) { + if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) { LOG_DEBUG_IC("ICP111", "dropped incoming handshake: %s poison: %s", actorId.ToString().data(), poison ? "true" : "false"); if (poison) { @@ -444,7 +444,7 @@ namespace NActors { void DropOutgoingHandshake(bool poison = true) { ICPROXY_PROFILED; - if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) { + if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) { LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(), poison ? "true" : "false"); if (poison) { @@ -477,12 +477,12 @@ namespace NActors { SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection); } - void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId, + void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId, THolder<IEventBase> event); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TActorId OutgoingHandshakeActor; + TActorId OutgoingHandshakeActor; TInstant OutgoingHandshakeActorCreated; TInstant OutgoingHandshakeActorReset; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index b95c994598..2c025dc389 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -23,7 +23,7 @@ namespace NActors { } } - TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { + TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h index fc71073c2d..086fe26ab3 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h @@ -34,7 +34,7 @@ namespace NActors { } } - TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override; + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override; void Die(const TActorContext& ctx) override; @@ -50,8 +50,8 @@ namespace NActors { TInterconnectProxyCommon::TPtr const ProxyCommonCtx; }; - static inline TActorId MakeInterconnectListenerActorId(bool dynamic) { + static inline TActorId MakeInterconnectListenerActorId(bool dynamic) { char x[12] = {'I', 'C', 'L', 'i', 's', 't', 'e', 'n', 'e', 'r', '/', dynamic ? 'D' : 'S'}; - return TActorId(0, TStringBuf(x, 12)); + return TActorId(0, TStringBuf(x, 12)); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2ded7f9f53..468e8bdd64 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -7,7 +7,7 @@ #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/monlib/service/pages/templates.h> namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); @@ -474,7 +474,7 @@ namespace NActors { if (ev->Sender == ReceiverId) { const bool wasConnected(Socket); LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data()); - ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet + ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet if (wasConnected) { // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should // restart handshake process, closing our part of socket first diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7fc00dbcc5..dfab4065c0 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -10,7 +10,7 @@ #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/util/funnel_queue.h> #include <library/cpp/actors/util/recentwnd.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <util/generic/queue.h> @@ -179,7 +179,7 @@ namespace NActors { return INTERCONNECT_SESSION_TCP; } - TInputSessionTCP(const TActorId& sessionId, + TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, @@ -495,7 +495,7 @@ namespace NActors { void GenerateHttpInfo(TStringStream& str); TIntrusivePtr<TReceiveContext> ReceiveContext; - TActorId ReceiverId; + TActorId ReceiverId; TDuration Ping; ui64 ConfirmPacketsForcedBySize = 0; @@ -513,7 +513,7 @@ namespace NActors { : public TActorBootstrapped<TInterconnectSessionKiller> { ui32 RepliesReceived = 0; ui32 RepliesNumber = 0; - TActorId LargestSession = TActorId(); + TActorId LargestSession = TActorId(); ui64 MaxBufferSize = 0; TInterconnectProxyCommon::TPtr Common; @@ -529,7 +529,7 @@ namespace NActors { void Bootstrap() { auto sender = SelfId(); - const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { + const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { auto ev = new TEvSessionBufferSizeRequest(); return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); }; diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index 2a8443da71..22850b3126 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -72,7 +72,7 @@ namespace NInterconnect { }; class TLoadResponderMasterActor : public TActorBootstrapped<TLoadResponderMasterActor> { - TVector<TActorId> Slaves; + TVector<TActorId> Slaves; ui32 SlaveIndex = 0; STRICT_STFUNC(StateFunc, @@ -93,7 +93,7 @@ namespace NInterconnect { } void Die(const TActorContext& ctx) override { - for (const TActorId& actorId : Slaves) { + for (const TActorId& actorId : Slaves) { ctx.Send(actorId, new TEvents::TEvPoisonPill); } TActorBootstrapped::Die(ctx); @@ -122,9 +122,9 @@ namespace NInterconnect { return new TLoadResponderMasterActor(); } - TActorId MakeLoadResponderActorId(ui32 nodeId) { + TActorId MakeLoadResponderActorId(ui32 nodeId) { char x[12] = {'I', 'C', 'L', 'o', 'a', 'd', 'R', 'e', 's', 'p', 'A', 'c'}; - return TActorId(nodeId, TStringBuf(x, 12)); + return TActorId(nodeId, TStringBuf(x, 12)); } class TLoadActor: public TActorBootstrapped<TLoadActor> { @@ -144,8 +144,8 @@ namespace NInterconnect { TInstant NextMessageTimestamp; THashMap<TString, TMessageInfo> InFly; ui64 NextId = 1; - TVector<TActorId> Hops; - TActorId FirstHop; + TVector<TActorId> Hops; + TActorId FirstHop; ui64 NumDropped = 0; std::shared_ptr<std::atomic_uint64_t> Traffic; @@ -167,7 +167,7 @@ namespace NInterconnect { Traffic = std::move(ev->Get()->Traffic); for (const ui32 nodeId : Params.NodeHops) { - const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId(); + const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId(); if (!FirstHop) { FirstHop = actorId; } else { diff --git a/library/cpp/actors/interconnect/load.h b/library/cpp/actors/interconnect/load.h index 0a01a0dc04..060fa7641b 100644 --- a/library/cpp/actors/interconnect/load.h +++ b/library/cpp/actors/interconnect/load.h @@ -5,7 +5,7 @@ namespace NInterconnect { // load responder -- lives on every node as a service actor NActors::IActor* CreateLoadResponderActor(); - NActors::TActorId MakeLoadResponderActorId(ui32 node); + NActors::TActorId MakeLoadResponderActorId(ui32 node); // load actor -- generates load with specific parameters struct TLoadParams { diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp index 884503e602..1267920559 100644 --- a/library/cpp/actors/interconnect/mock/ic_mock.cpp +++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp @@ -42,7 +42,7 @@ namespace NActors { : Key(key) {} - void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) { + void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) { TPeerInfo *peer = GetPeer(nodeId); auto guard = TWriteGuard(peer->Mutex); Y_VERIFY(!peer->ActorSystem); @@ -188,7 +188,7 @@ namespace NActors { , Common(std::move(common)) {} - void Registered(TActorSystem *as, const TActorId& parent) override { + void Registered(TActorSystem *as, const TActorId& parent) override { TActor::Registered(as, parent); State.Attach(NodeId, as, SelfId()); } diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 4ba50a2b5f..187d0b6bdf 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -18,7 +18,7 @@ using NActors::IEventBase; using NActors::IEventHandle; -using NActors::TActorId; +using NActors::TActorId; using NActors::TConstIoVec; using NActors::TEventSerializedData; @@ -91,8 +91,8 @@ union TTcpPacketBuf { struct TEventDescr { ui32 Type; ui32 Flags; - TActorId Recipient; - TActorId Sender; + TActorId Recipient; + TActorId Sender; ui64 Cookie; // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor NWilson::TTraceId::TSerializedTraceId TraceId; @@ -102,7 +102,7 @@ struct TEventDescr { struct TEventHolder : TNonCopyable { TEventDescr Descr; - TActorId ForwardRecipient; + TActorId ForwardRecipient; THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; ui64 Serial; diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef4..8c7b61a7a7 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -1,35 +1,35 @@ #include "poller_actor.h" #include "interconnect_common.h" -#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/core/probes.h> -#include <library/cpp/actors/protos/services_common.pb.h> +#include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/actors/util/funnel_queue.h> - + #include <util/generic/intrlist.h> #include <util/system/thread.h> #include <util/system/event.h> #include <util/system/pipe.h> #include <variant> - + namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - namespace { + namespace { int LastSocketError() { #if defined(_win_) return WSAGetLastError(); #else return errno; #endif - } - } - + } + } + struct TSocketRecord : TThrRefBase { const TIntrusivePtr<TSharedDescriptor> Socket; const TActorId ReadActorId; @@ -57,7 +57,7 @@ namespace NActors { : Socket(std::move(socket)) {} }; - + using TPollerSyncOperation = std::variant<TPollerExitThread, TPollerWakeup, TPollerUnregisterSocket>; struct TPollerSyncOperationWrapper { @@ -149,7 +149,7 @@ namespace NActors { bool DrainReadEnd() { size_t totalRead = 0; char buffer[4096]; - for (;;) { + for (;;) { ssize_t n = ReadEnd.Read(buffer, sizeof(buffer)); if (n < 0) { const int error = LastSocketError(); @@ -157,17 +157,17 @@ namespace NActors { continue; } else if (error == EAGAIN || error == EWOULDBLOCK) { break; - } else { + } else { Y_FAIL("read() failed with %s", strerror(errno)); - } + } } else { Y_VERIFY(n); totalRead += n; - } + } } return totalRead; } - + bool ProcessSyncOpQueue() { if (DrainReadEnd()) { Y_VERIFY(!SyncOperationsQ.IsEmpty()); @@ -181,25 +181,25 @@ namespace NActors { return false; // terminate the thread } else if (std::get_if<TPollerWakeup>(&op->Operation)) { op->SignalDone(); - } else { + } else { Y_FAIL(); - } + } } while (SyncOperationsQ.Pop()); - } + } return true; - } - + } + void *ThreadProc() override { SetCurrentThreadName("network poller"); while (ProcessSyncOpQueue()) { static_cast<TDerived&>(*this).ProcessEventsInLoop(); - } + } return nullptr; - } + } }; - + } // namespace NActors - + #if defined(_linux_) # include "poller_actor_linux.h" #elif defined(_darwin_) @@ -209,38 +209,38 @@ namespace NActors { #else # error "Unsupported platform" #endif - + namespace NActors { - + class TPollerToken::TImpl { std::weak_ptr<TPollerThread> Thread; TIntrusivePtr<TSocketRecord> Record; // valid only when Thread is held locked - - public: + + public: TImpl(std::shared_ptr<TPollerThread> thread, TIntrusivePtr<TSocketRecord> record) : Thread(thread) , Record(std::move(record)) - { + { thread->RegisterSocket(Record); } - + ~TImpl() { if (auto thread = Thread.lock()) { thread->UnregisterSocket(Record); - } - } - + } + } + void Request(bool read, bool write) { if (auto thread = Thread.lock()) { thread->Request(Record, read, write); - } - } + } + } const TIntrusivePtr<TSharedDescriptor>& Socket() const { return Record->Socket; } - }; - + }; + class TPollerActor: public TActorBootstrapped<TPollerActor> { // poller thread std::shared_ptr<TPollerThread> PollerThread; diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h index f927b82089..5bd4f50704 100644 --- a/library/cpp/actors/interconnect/poller_actor.h +++ b/library/cpp/actors/interconnect/poller_actor.h @@ -55,9 +55,9 @@ namespace NActors { IActor* CreatePollerActor(); - inline TActorId MakePollerActorId() { + inline TActorId MakePollerActorId() { char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); } } diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859..bbdabbd339 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index e6b2bd4e4c..334859882f 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -2,7 +2,7 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/interconnect/interconnect_common.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/interconnect/event_holder_pool.h> #include <atomic> diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp index ba2a50c6f6..d67509f058 100644 --- a/library/cpp/actors/interconnect/ut/large.cpp +++ b/library/cpp/actors/interconnect/ut/large.cpp @@ -14,10 +14,10 @@ Y_UNIT_TEST_SUITE(LargeMessage) { using namespace NActors; class TProducer: public TActorBootstrapped<TProducer> { - const TActorId RecipientActorId; + const TActorId RecipientActorId; public: - TProducer(const TActorId& recipientActorId) + TProducer(const TActorId& recipientActorId) : RecipientActorId(recipientActorId) {} @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) { class TConsumer : public TActorBootstrapped<TConsumer> { TManualEvent& Done; - TActorId SessionId; + TActorId SessionId; public: TConsumer(TManualEvent& done) @@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) { TManualEvent done; TConsumer* consumer = new TConsumer(done); - const TActorId recp = testCluster.RegisterActor(consumer, 1); + const TActorId recp = testCluster.RegisterActor(consumer, 1); testCluster.RegisterActor(new TProducer(recp), 2); done.WaitI(); } diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index 2b6d27cd3f..ac46180804 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -70,7 +70,7 @@ public: ~TTestICCluster() { } - TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { + TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { return Nodes[nodeId]->RegisterActor(actor); } @@ -78,7 +78,7 @@ public: return Nodes[nodeId]->InterconnectProxy(peerNodeId); } - void KillActor(ui32 nodeId, const TActorId& id) { + void KillActor(ui32 nodeId, const TActorId& id) { Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); } }; diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index ff30b1445e..59dd2554c8 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -62,7 +62,7 @@ public: setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); - const TActorId loggerActorId(0, "logger"); + const TActorId loggerActorId(0, "logger"); constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER auto loggerSettings = MakeIntrusive<NLog::TSettings>( @@ -114,7 +114,7 @@ public: ActorSystem->Stop(); } - bool Send(const TActorId& recipient, IEventBase* ev) { + bool Send(const TActorId& recipient, IEventBase* ev) { return ActorSystem->Send(recipient, ev); } @@ -127,7 +127,7 @@ public: } void RegisterServiceActor(const TActorId& serviceId, IActor* actor) { - const TActorId actorId = ActorSystem->Register(actor); + const TActorId actorId = ActorSystem->Register(actor); ActorSystem->RegisterLocalService(serviceId, actorId); } diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h index 7591200471..07fe10d93a 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_actors.h +++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h @@ -3,13 +3,13 @@ namespace NActors { class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> { protected: - const TActorId RecipientActorId; + const TActorId RecipientActorId; const ui32 Preload; ui64 SequenceNumber = 0; ui32 InFlySize = 0; public: - TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) + TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) : RecipientActorId(recipientActorId) , Preload(preload) { diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp index 23d846a2fd..dbd05ce746 100644 --- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp +++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp @@ -1,38 +1,38 @@ -#include <library/cpp/actors/interconnect/poller_actor.h> -#include <library/cpp/actors/testlib/test_runtime.h> - +#include <library/cpp/actors/interconnect/poller_actor.h> +#include <library/cpp/actors/testlib/test_runtime.h> + #include <library/cpp/testing/unittest/registar.h> - -#include <util/network/pair.h> -#include <util/network/socket.h> - -using namespace NActors; - -class TTestSocket: public TSharedDescriptor { -public: - explicit TTestSocket(SOCKET fd) - : Fd_(fd) - { - } - - int GetDescriptor() override { - return Fd_; - } - -private: - SOCKET Fd_; -}; -using TTestSocketPtr = TIntrusivePtr<TTestSocket>; - -// create pair of connected, non-blocking sockets -std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() { - SOCKET fds[2]; - SocketPair(fds); - SetNonBlock(fds[0]); - SetNonBlock(fds[1]); - return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])}; -} - + +#include <util/network/pair.h> +#include <util/network/socket.h> + +using namespace NActors; + +class TTestSocket: public TSharedDescriptor { +public: + explicit TTestSocket(SOCKET fd) + : Fd_(fd) + { + } + + int GetDescriptor() override { + return Fd_; + } + +private: + SOCKET Fd_; +}; +using TTestSocketPtr = TIntrusivePtr<TTestSocket>; + +// create pair of connected, non-blocking sockets +std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() { + SOCKET fds[2]; + SocketPair(fds); + SetNonBlock(fds[0]); + SetNonBlock(fds[1]); + return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])}; +} + std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() { // create server (listening) socket SOCKET server = socket(AF_INET, SOCK_STREAM, 0); @@ -74,101 +74,101 @@ std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() { return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted)); } -class TPollerActorTest: public TTestBase { - UNIT_TEST_SUITE(TPollerActorTest); - UNIT_TEST(Registration) - UNIT_TEST(ReadNotification) - UNIT_TEST(WriteNotification) - UNIT_TEST(HangupNotification) - UNIT_TEST_SUITE_END(); - -public: - void SetUp() override { - ActorSystem_ = MakeHolder<TTestActorRuntimeBase>(); - ActorSystem_->Initialize(); - - PollerId_ = ActorSystem_->Register(CreatePollerActor()); - - TDispatchOptions opts; - opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); - ActorSystem_->DispatchEvents(opts); - } - - void Registration() { - auto [s1, s2] = NonBlockSockets(); - auto readerId = ActorSystem_->AllocateEdgeActor(); - auto writerId = ActorSystem_->AllocateEdgeActor(); - +class TPollerActorTest: public TTestBase { + UNIT_TEST_SUITE(TPollerActorTest); + UNIT_TEST(Registration) + UNIT_TEST(ReadNotification) + UNIT_TEST(WriteNotification) + UNIT_TEST(HangupNotification) + UNIT_TEST_SUITE_END(); + +public: + void SetUp() override { + ActorSystem_ = MakeHolder<TTestActorRuntimeBase>(); + ActorSystem_->Initialize(); + + PollerId_ = ActorSystem_->Register(CreatePollerActor()); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); + ActorSystem_->DispatchEvents(opts); + } + + void Registration() { + auto [s1, s2] = NonBlockSockets(); + auto readerId = ActorSystem_->AllocateEdgeActor(); + auto writerId = ActorSystem_->AllocateEdgeActor(); + RegisterSocket(s1, readerId, writerId); - - // reader should receive event after socket registration + + // reader should receive event after socket registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId); token = ev->Get()->PollerToken; - } - - // writer should receive event after socket registration - { + } + + // writer should receive event after socket registration + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId); UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken); - } - } - - void ReadNotification() { - auto [r, w] = NonBlockSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + } + } + + void ReadNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); RegisterSocket(r, clientId, {}); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - - char buf; - - // data not ready yet for read - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); - UNIT_ASSERT(errno == EWOULDBLOCK); - + } + + char buf; + + // data not ready yet for read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + // request read poll token->Request(true, false); - // write data - UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1); - - // notification after socket become readable - { + // write data + UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1); + + // notification after socket become readable + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); UNIT_ASSERT(ev->Get()->Read); UNIT_ASSERT(!ev->Get()->Write); - } - - // read data - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1); - UNIT_ASSERT_EQUAL('x', buf); - - // no more data to read - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); - UNIT_ASSERT(errno == EWOULDBLOCK); - } - - void WriteNotification() { + } + + // read data + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1); + UNIT_ASSERT_EQUAL('x', buf); + + // no more data to read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + } + + void WriteNotification() { auto [r, w] = TcpSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + auto clientId = ActorSystem_->AllocateEdgeActor(); SetNonBlock(w->GetDescriptor()); RegisterSocket(w, TActorId{}, clientId); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - + } + char buffer[4096]; memset(buffer, 'x', sizeof(buffer)); @@ -181,7 +181,7 @@ public: written += res; } else if (res == 0) { UNIT_FAIL("unexpected zero return from send()"); - } else { + } else { UNIT_ASSERT(res == -1); if (errno == EINTR) { continue; @@ -191,10 +191,10 @@ public: } else { UNIT_FAIL("unexpected error from send()"); } - } - } + } + } Cerr << "written " << written << " bytes" << Endl; - + // read all written data from the read end for (;;) { char buffer[4096]; @@ -216,7 +216,7 @@ public: } } } - + // wait for notification after socket becomes writable again { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); @@ -224,41 +224,41 @@ public: UNIT_ASSERT(!ev->Get()->Read); UNIT_ASSERT(ev->Get()->Write); } - } - } - - void HangupNotification() { - auto [r, w] = NonBlockSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + } + } + + void HangupNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); RegisterSocket(r, clientId, TActorId{}); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - + } + token->Request(true, false); ShutDown(w->GetDescriptor(), SHUT_RDWR); - + // notification after peer shuts down its socket - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); UNIT_ASSERT(ev->Get()->Read); - } - } - -private: + } + } + +private: void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) { auto ev = new TEvPollerRegister{socket, readActorId, writeActorId}; - ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); - } - -private: - THolder<TTestActorRuntimeBase> ActorSystem_; - TActorId PollerId_; -}; - -UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest); + ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); + } + +private: + THolder<TTestActorRuntimeBase> ActorSystem_; + TActorId PollerId_; +}; + +UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest); diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make index 2f5b13352e..ec19f1a64a 100644 --- a/library/cpp/actors/interconnect/ut/ya.make +++ b/library/cpp/actors/interconnect/ut/ya.make @@ -15,11 +15,11 @@ ELSE() ENDIF() SRCS( - channel_scheduler_ut.cpp + channel_scheduler_ut.cpp event_holder_pool_ut.cpp interconnect_ut.cpp large.cpp - poller_actor_ut.cpp + poller_actor_ut.cpp dynamic_proxy_ut.cpp ) @@ -28,7 +28,7 @@ PEERDIR( library/cpp/actors/interconnect library/cpp/actors/interconnect/ut/lib library/cpp/actors/interconnect/ut/protos - library/cpp/actors/testlib + library/cpp/actors/testlib library/cpp/digest/md5 library/cpp/testing/unittest ) diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp index 5d19bc3003..69374cd080 100644 --- a/library/cpp/actors/interconnect/ut_fat/main.cpp +++ b/library/cpp/actors/interconnect/ut_fat/main.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { ui16 SendFlags; public: - TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) + TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) : TSenderBaseActor(recipientActorId, 32) , SendFlags(sendFlags) { @@ -108,7 +108,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); - const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); TSenderActor* senderActor = new TSenderActor(recipient, flags); testCluster.RegisterActor(senderActor, 1); @@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); - const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); TSenderActor* senderActor = new TSenderActor(recipient, flags); testCluster.RegisterActor(senderActor, 1); diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make index 60d29b0fc0..9e4fb46fdb 100644 --- a/library/cpp/actors/interconnect/ya.make +++ b/library/cpp/actors/interconnect/ya.make @@ -75,18 +75,18 @@ PEERDIR( contrib/libs/libc_compat contrib/libs/openssl library/cpp/actors/core - library/cpp/actors/dnscachelib + library/cpp/actors/dnscachelib library/cpp/actors/dnsresolver library/cpp/actors/helpers library/cpp/actors/prof library/cpp/actors/protos library/cpp/actors/util library/cpp/digest/crc32c - library/cpp/json + library/cpp/json library/cpp/lwtrace - library/cpp/monlib/dynamic_counters + library/cpp/monlib/dynamic_counters library/cpp/monlib/metrics - library/cpp/monlib/service/pages/tablesorter + library/cpp/monlib/service/pages/tablesorter library/cpp/openssl/init library/cpp/packedtypes ) |