aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/event_holder_pool.h4
-rw-r--r--library/cpp/actors/interconnect/events_local.h550
-rw-r--r--library/cpp/actors/interconnect/interconnect.h188
-rw-r--r--library/cpp/actors/interconnect/interconnect_address.cpp60
-rw-r--r--library/cpp/actors/interconnect/interconnect_address.h22
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp18
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h44
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h56
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp18
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.h6
-rw-r--r--library/cpp/actors/interconnect/interconnect_impl.h60
-rw-r--r--library/cpp/actors/interconnect/interconnect_nameserver_table.cpp60
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.cpp340
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.h64
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp588
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h322
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp24
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.h38
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp598
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h280
-rw-r--r--library/cpp/actors/interconnect/load.cpp44
-rw-r--r--library/cpp/actors/interconnect/load.h16
-rw-r--r--library/cpp/actors/interconnect/logging.h6
-rw-r--r--library/cpp/actors/interconnect/packet.h20
-rw-r--r--library/cpp/actors/interconnect/poller.h24
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp6
-rw-r--r--library/cpp/actors/interconnect/poller_actor.h4
-rw-r--r--library/cpp/actors/interconnect/poller_tcp.cpp50
-rw-r--r--library/cpp/actors/interconnect/poller_tcp.h22
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit.cpp192
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit.h76
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp192
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit_epoll.h40
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit_select.cpp102
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit_select.h20
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h8
-rw-r--r--library/cpp/actors/interconnect/ut/lib/interrupter.h28
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h4
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_actors.h92
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_events.h2
-rw-r--r--library/cpp/actors/interconnect/ut_fat/main.cpp4
-rw-r--r--library/cpp/actors/interconnect/watchdog_timer.h10
43 files changed, 2154 insertions, 2154 deletions
diff --git a/library/cpp/actors/interconnect/event_holder_pool.h b/library/cpp/actors/interconnect/event_holder_pool.h
index 9fe5e03be6..b6090a3bc8 100644
--- a/library/cpp/actors/interconnect/event_holder_pool.h
+++ b/library/cpp/actors/interconnect/event_holder_pool.h
@@ -117,7 +117,7 @@ namespace NActors {
}
private:
- TEvFreeItems* GetPendingEvent() {
+ TEvFreeItems* GetPendingEvent() {
if (!PendingFreeEvent) {
PendingFreeEvent.Reset(new TEvFreeItems);
}
@@ -125,4 +125,4 @@ namespace NActors {
}
};
-}
+}
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 7880ca283f..8a46ffd535 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -11,324 +11,324 @@
#include "types.h"
namespace NActors {
- struct TProgramInfo {
- ui64 PID = 0;
- ui64 StartTime = 0;
- ui64 Serial = 0;
- };
-
- enum class ENetwork : ui32 {
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // local messages
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP),
-
- SocketReadyRead = Start,
- SocketReadyWrite,
- SocketError,
- Connect,
- Disconnect,
- IncomingConnection,
- HandshakeAsk,
- HandshakeAck,
+ struct TProgramInfo {
+ ui64 PID = 0;
+ ui64 StartTime = 0;
+ ui64 Serial = 0;
+ };
+
+ enum class ENetwork : ui32 {
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // local messages
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP),
+
+ SocketReadyRead = Start,
+ SocketReadyWrite,
+ SocketError,
+ Connect,
+ Disconnect,
+ IncomingConnection,
+ HandshakeAsk,
+ HandshakeAck,
HandshakeNak,
- HandshakeDone,
- HandshakeFail,
- Kick,
- Flush,
- NodeInfo,
- BunchOfEventsToDestroy,
- HandshakeRequest,
- HandshakeReplyOK,
- HandshakeReplyError,
- ResolveAddress,
- AddressInfo,
- ResolveError,
- HTTPStreamStatus,
- HTTPSendContent,
- ConnectProtocolWakeup,
- HTTPProtocolRetry,
- EvPollerRegister,
+ HandshakeDone,
+ HandshakeFail,
+ Kick,
+ Flush,
+ NodeInfo,
+ BunchOfEventsToDestroy,
+ HandshakeRequest,
+ HandshakeReplyOK,
+ HandshakeReplyError,
+ ResolveAddress,
+ AddressInfo,
+ ResolveError,
+ HTTPStreamStatus,
+ HTTPSendContent,
+ ConnectProtocolWakeup,
+ HTTPProtocolRetry,
+ EvPollerRegister,
EvPollerRegisterResult,
EvPollerReady,
- EvUpdateFromInputSession,
+ EvUpdateFromInputSession,
EvConfirmUpdate,
- EvSessionBufferSizeRequest,
- EvSessionBufferSizeResponse,
+ EvSessionBufferSizeRequest,
+ EvSessionBufferSizeResponse,
EvProcessPingRequest,
EvGetSecureSocket,
EvSecureSocket,
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- // interconnect load test message
- EvLoadMessage = Start + 256,
- };
-
- struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead")
- };
-
- struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite")
- };
-
- struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error))
- TString GetReason() const {
- return ::strerror(Error);
- }
- const int Error;
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
-
- TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock)
- : Error(error)
- , Socket(std::move(sock))
- {
- }
- };
-
- struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect")
- };
-
- struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect")
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ // interconnect load test message
+ EvLoadMessage = Start + 256,
+ };
+
+ struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead")
+ };
+
+ struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite")
+ };
+
+ struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error))
+ TString GetReason() const {
+ return ::strerror(Error);
+ }
+ const int Error;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+
+ TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock)
+ : Error(error)
+ , Socket(std::move(sock))
+ {
+ }
+ };
+
+ struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect")
+ };
+
+ struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect")
TDisconnectReason Reason;
TEvSocketDisconnect(TDisconnectReason reason)
: Reason(std::move(reason))
- {
- }
- };
+ {
+ }
+ };
- struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
+ struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
TEvHandshakeAsk(const TActorId& self,
const TActorId& peer,
- ui64 counter)
- : Self(self)
- , Peer(peer)
- , Counter(counter)
- {
- }
+ ui64 counter)
+ : Self(self)
+ , Peer(peer)
+ , Counter(counter)
+ {
+ }
const TActorId Self;
const TActorId Peer;
- const ui64 Counter;
- };
+ const ui64 Counter;
+ };
- struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck")
+ struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck")
TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
- : Self(self)
- , NextPacket(nextPacket)
+ : Self(self)
+ , NextPacket(nextPacket)
, Params(std::move(params))
{}
const TActorId Self;
- const ui64 NextPacket;
+ const ui64 NextPacket;
const TSessionParams Params;
- };
+ };
struct TEvHandshakeNak : TEventLocal<TEvHandshakeNak, ui32(ENetwork::HandshakeNak)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvHandshakeNak")
};
- struct TEvHandshakeRequest
- : public TEventLocal<TEvHandshakeRequest,
- ui32(ENetwork::HandshakeRequest)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest,
- "Network: TEvHandshakeRequest")
+ struct TEvHandshakeRequest
+ : public TEventLocal<TEvHandshakeRequest,
+ ui32(ENetwork::HandshakeRequest)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest,
+ "Network: TEvHandshakeRequest")
- NActorsInterconnect::THandshakeRequest Record;
- };
+ NActorsInterconnect::THandshakeRequest Record;
+ };
- struct TEvHandshakeReplyOK
- : public TEventLocal<TEvHandshakeReplyOK,
- ui32(ENetwork::HandshakeReplyOK)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK,
- "Network: TEvHandshakeReplyOK")
+ struct TEvHandshakeReplyOK
+ : public TEventLocal<TEvHandshakeReplyOK,
+ ui32(ENetwork::HandshakeReplyOK)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK,
+ "Network: TEvHandshakeReplyOK")
- NActorsInterconnect::THandshakeReply Record;
- };
+ NActorsInterconnect::THandshakeReply Record;
+ };
- struct TEvHandshakeReplyError
- : public TEventLocal<TEvHandshakeReplyError,
- ui32(ENetwork::HandshakeReplyError)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError,
- "Network: TEvHandshakeReplyError")
+ struct TEvHandshakeReplyError
+ : public TEventLocal<TEvHandshakeReplyError,
+ ui32(ENetwork::HandshakeReplyError)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError,
+ "Network: TEvHandshakeReplyError")
- TEvHandshakeReplyError(TString error) {
- Record.SetErrorExplaination(error);
- }
+ TEvHandshakeReplyError(TString error) {
+ Record.SetErrorExplaination(error);
+ }
- NActorsInterconnect::THandshakeReply Record;
- };
+ NActorsInterconnect::THandshakeReply Record;
+ };
- struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection")
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
- NInterconnect::TAddress Address;
+ struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection")
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ NInterconnect::TAddress Address;
TEvIncomingConnection(TIntrusivePtr<NInterconnect::TStreamSocket> socket, NInterconnect::TAddress address)
: Socket(std::move(socket))
, Address(std::move(address))
{}
- };
+ };
- struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone")
+ struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone")
- TEvHandshakeDone(
+ TEvHandshakeDone(
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
const TActorId& peer,
const TActorId& self,
ui64 nextPacket,
TAutoPtr<TProgramInfo>&& programInfo,
TSessionParams params)
- : Socket(std::move(socket))
- , Peer(peer)
- , Self(self)
- , NextPacket(nextPacket)
- , ProgramInfo(std::move(programInfo))
+ : Socket(std::move(socket))
+ , Peer(peer)
+ , Self(self)
+ , NextPacket(nextPacket)
+ , ProgramInfo(std::move(programInfo))
, Params(std::move(params))
- {
- }
+ {
+ }
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
const TActorId Peer;
const TActorId Self;
- const ui64 NextPacket;
- TAutoPtr<TProgramInfo> ProgramInfo;
+ const ui64 NextPacket;
+ TAutoPtr<TProgramInfo> ProgramInfo;
const TSessionParams Params;
- };
+ };
+
+ struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail")
- struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail")
+ enum EnumHandshakeFail {
+ HANDSHAKE_FAIL_TRANSIENT,
+ HANDSHAKE_FAIL_PERMANENT,
+ HANDSHAKE_FAIL_SESSION_MISMATCH,
+ };
- enum EnumHandshakeFail {
- HANDSHAKE_FAIL_TRANSIENT,
- HANDSHAKE_FAIL_PERMANENT,
- HANDSHAKE_FAIL_SESSION_MISMATCH,
- };
-
TEvHandshakeFail(EnumHandshakeFail temporary, TString explanation)
- : Temporary(temporary)
- , Explanation(std::move(explanation))
- {
- }
-
- const EnumHandshakeFail Temporary;
- const TString Explanation;
- };
-
- struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick")
- };
-
- struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush")
- };
-
- struct TEvLocalNodeInfo
- : public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo")
-
- ui32 NodeId;
- NAddr::IRemoteAddrPtr Address;
- };
-
- struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy,
- "Network: TEvBunchOfEventsToDestroy")
-
- TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events)
- : Events(std::move(events))
- {
- }
-
- TDeque<TAutoPtr<IEventBase>> Events;
- };
-
- struct TEvResolveAddress
- : public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress")
-
- TString Address;
- ui16 Port;
- };
-
- struct TEvAddressInfo
- : public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo")
-
- NAddr::IRemoteAddrPtr Address;
- };
-
- struct TEvResolveError
- : public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError")
-
- TString Explain;
- };
-
- struct TEvHTTPStreamStatus
- : public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus,
- "Network: TEvHTTPStreamStatus")
- enum EStatus {
- READY,
- COMPLETE,
- ERROR,
- };
-
- EStatus Status;
- TString Error;
- TString HttpHeaders;
- };
-
- struct TEvHTTPSendContent
- : public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent")
-
- const char* Data;
- size_t Len;
- bool Last;
- };
-
- struct TEvConnectWakeup
- : public TEventLocal<TEvConnectWakeup,
- ui32(ENetwork::ConnectProtocolWakeup)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup")
- };
-
- struct TEvHTTPProtocolRetry
- : public TEventLocal<TEvHTTPProtocolRetry,
- ui32(ENetwork::HTTPProtocolRetry)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry,
- "Protocols: TEvHTTPProtocolRetry")
- };
-
- struct TEvLoadMessage
- : TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> {
- TEvLoadMessage() = default;
-
- template <typename TContainer>
+ : Temporary(temporary)
+ , Explanation(std::move(explanation))
+ {
+ }
+
+ const EnumHandshakeFail Temporary;
+ const TString Explanation;
+ };
+
+ struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick")
+ };
+
+ struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush")
+ };
+
+ struct TEvLocalNodeInfo
+ : public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo")
+
+ ui32 NodeId;
+ NAddr::IRemoteAddrPtr Address;
+ };
+
+ struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy,
+ "Network: TEvBunchOfEventsToDestroy")
+
+ TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events)
+ : Events(std::move(events))
+ {
+ }
+
+ TDeque<TAutoPtr<IEventBase>> Events;
+ };
+
+ struct TEvResolveAddress
+ : public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress")
+
+ TString Address;
+ ui16 Port;
+ };
+
+ struct TEvAddressInfo
+ : public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo")
+
+ NAddr::IRemoteAddrPtr Address;
+ };
+
+ struct TEvResolveError
+ : public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError")
+
+ TString Explain;
+ };
+
+ struct TEvHTTPStreamStatus
+ : public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus,
+ "Network: TEvHTTPStreamStatus")
+ enum EStatus {
+ READY,
+ COMPLETE,
+ ERROR,
+ };
+
+ EStatus Status;
+ TString Error;
+ TString HttpHeaders;
+ };
+
+ struct TEvHTTPSendContent
+ : public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent")
+
+ const char* Data;
+ size_t Len;
+ bool Last;
+ };
+
+ struct TEvConnectWakeup
+ : public TEventLocal<TEvConnectWakeup,
+ ui32(ENetwork::ConnectProtocolWakeup)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup")
+ };
+
+ struct TEvHTTPProtocolRetry
+ : public TEventLocal<TEvHTTPProtocolRetry,
+ ui32(ENetwork::HTTPProtocolRetry)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry,
+ "Protocols: TEvHTTPProtocolRetry")
+ };
+
+ struct TEvLoadMessage
+ : TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> {
+ TEvLoadMessage() = default;
+
+ template <typename TContainer>
TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) {
for (const TActorId& actorId : route) {
- auto* hop = Record.AddHops();
+ auto* hop = Record.AddHops();
if (actorId) {
ActorIdToProto(actorId, hop->MutableNextHop());
}
- }
- Record.SetId(id);
- if (payload) {
+ }
+ Record.SetId(id);
+ if (payload) {
Record.SetPayload(*payload);
- }
+ }
}
template <typename TContainer>
@@ -342,39 +342,39 @@ namespace NActors {
Record.SetId(id);
AddPayload(std::move(payload));
}
- };
+ };
- struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> {
- ui64 ConfirmedByInput; // latest Confirm value from processed input packet
- ui64 NumDataBytes;
+ struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> {
+ ui64 ConfirmedByInput; // latest Confirm value from processed input packet
+ ui64 NumDataBytes;
TDuration Ping;
TEvUpdateFromInputSession(ui64 confirmedByInput, ui64 numDataBytes, TDuration ping)
- : ConfirmedByInput(confirmedByInput)
- , NumDataBytes(numDataBytes)
+ : ConfirmedByInput(confirmedByInput)
+ , NumDataBytes(numDataBytes)
, Ping(ping)
- {
- }
- };
+ {
+ }
+ };
struct TEvConfirmUpdate : TEventLocal<TEvConfirmUpdate, static_cast<ui32>(ENetwork::EvConfirmUpdate)>
{};
- struct TEvSessionBufferSizeRequest : TEventLocal<TEvSessionBufferSizeRequest, static_cast<ui32>(ENetwork::EvSessionBufferSizeRequest)> {
- //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest")
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest");
- };
+ struct TEvSessionBufferSizeRequest : TEventLocal<TEvSessionBufferSizeRequest, static_cast<ui32>(ENetwork::EvSessionBufferSizeRequest)> {
+ //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest")
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest");
+ };
- struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {
+ struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {
TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
- : SessionID(sessionId)
- , BufferSize(outputBufferSize)
- {
- }
+ : SessionID(sessionId)
+ , BufferSize(outputBufferSize)
+ {
+ }
TActorId SessionID;
- ui64 BufferSize;
- };
+ ui64 BufferSize;
+ };
struct TEvProcessPingRequest : TEventLocal<TEvProcessPingRequest, static_cast<ui32>(ENetwork::EvProcessPingRequest)> {
const ui64 Payload;
@@ -400,4 +400,4 @@ namespace NActors {
{}
};
-}
+}
diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h
index 79f1fd1184..225a5243fd 100644
--- a/library/cpp/actors/interconnect/interconnect.h
+++ b/library/cpp/actors/interconnect/interconnect.h
@@ -6,122 +6,122 @@
#include <util/network/address.h>
namespace NActors {
- struct TInterconnectGlobalState: public TThrRefBase {
- TString SelfAddress;
- ui32 SelfPort;
+ struct TInterconnectGlobalState: public TThrRefBase {
+ TString SelfAddress;
+ ui32 SelfPort;
TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time)
- };
+ };
- struct TInterconnectProxySetup: public TThrRefBase {
- // synchronous (session -> proxy)
- struct IProxy : TNonCopyable {
- virtual ~IProxy() {
- }
+ struct TInterconnectProxySetup: public TThrRefBase {
+ // synchronous (session -> proxy)
+ struct IProxy : TNonCopyable {
+ virtual ~IProxy() {
+ }
- virtual void ActivateSession(const TActorContext& ctx) = 0; // session activated
- virtual void DetachSession(const TActorContext& ctx) = 0; // session is dead
- };
+ virtual void ActivateSession(const TActorContext& ctx) = 0; // session activated
+ virtual void DetachSession(const TActorContext& ctx) = 0; // session is dead
+ };
- // synchronous (proxy -> session)
- struct ISession : TNonCopyable {
- virtual ~ISession() {
- }
+ // synchronous (proxy -> session)
+ struct ISession : TNonCopyable {
+ virtual ~ISession() {
+ }
- virtual void DetachSession(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // kill yourself
- virtual void ForwardPacket(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // receive packet for forward
- virtual void Connect(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // begin connection
- virtual bool ReceiveIncomingSession(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // handle incoming session, if returns true - then session is dead and must be recreated with new one
- };
+ virtual void DetachSession(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // kill yourself
+ virtual void ForwardPacket(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // receive packet for forward
+ virtual void Connect(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // begin connection
+ virtual bool ReceiveIncomingSession(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // handle incoming session, if returns true - then session is dead and must be recreated with new one
+ };
- ui32 DestinationNode;
+ ui32 DestinationNode;
- TString StaticAddress; // if set - would be used as main destination address
- int StaticPort;
+ TString StaticAddress; // if set - would be used as main destination address
+ int StaticPort;
- TIntrusivePtr<TInterconnectGlobalState> GlobalState;
+ TIntrusivePtr<TInterconnectGlobalState> GlobalState;
virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls
- virtual TActorSetupCmd CreateAcceptor() = 0;
- };
+ virtual TActorSetupCmd CreateAcceptor() = 0;
+ };
- struct TNameserverSetup {
+ struct TNameserverSetup {
TActorId ServiceID;
- TIntrusivePtr<TInterconnectGlobalState> GlobalState;
- };
+ TIntrusivePtr<TInterconnectGlobalState> GlobalState;
+ };
- struct TTableNameserverSetup: public TThrRefBase {
- struct TNodeInfo {
- TString Address;
- TString Host;
- TString ResolveHost;
- ui16 Port;
+ struct TTableNameserverSetup: public TThrRefBase {
+ struct TNodeInfo {
+ TString Address;
+ TString Host;
+ TString ResolveHost;
+ ui16 Port;
TNodeLocation Location;
- TString& first;
- ui16& second;
+ TString& first;
+ ui16& second;
- TNodeInfo()
- : first(Address)
- , second(Port)
- {
- }
+ TNodeInfo()
+ : first(Address)
+ , second(Port)
+ {
+ }
TNodeInfo(const TNodeInfo&) = default;
- // for testing purposes only
- TNodeInfo(const TString& address, const TString& host, ui16 port)
- : TNodeInfo()
- {
- Address = address;
- Host = host;
- ResolveHost = host;
- Port = port;
- }
-
- TNodeInfo(const TString& address,
- const TString& host,
- const TString& resolveHost,
- ui16 port,
+ // for testing purposes only
+ TNodeInfo(const TString& address, const TString& host, ui16 port)
+ : TNodeInfo()
+ {
+ Address = address;
+ Host = host;
+ ResolveHost = host;
+ Port = port;
+ }
+
+ TNodeInfo(const TString& address,
+ const TString& host,
+ const TString& resolveHost,
+ ui16 port,
const TNodeLocation& location)
- : TNodeInfo()
- {
- Address = address;
- Host = host;
- ResolveHost = resolveHost;
- Port = port;
- Location = location;
- }
-
- // for testing purposes only
- TNodeInfo& operator=(const std::pair<TString, ui32>& pr) {
- Address = pr.first;
- Host = pr.first;
- ResolveHost = pr.first;
- Port = pr.second;
- return *this;
- }
-
- TNodeInfo& operator=(const TNodeInfo& ni) {
- Address = ni.Address;
- Host = ni.Host;
- ResolveHost = ni.ResolveHost;
- Port = ni.Port;
- Location = ni.Location;
- return *this;
- }
- };
-
- TMap<ui32, TNodeInfo> StaticNodeTable;
-
- bool IsEntriesUnique() const;
+ : TNodeInfo()
+ {
+ Address = address;
+ Host = host;
+ ResolveHost = resolveHost;
+ Port = port;
+ Location = location;
+ }
+
+ // for testing purposes only
+ TNodeInfo& operator=(const std::pair<TString, ui32>& pr) {
+ Address = pr.first;
+ Host = pr.first;
+ ResolveHost = pr.first;
+ Port = pr.second;
+ return *this;
+ }
+
+ TNodeInfo& operator=(const TNodeInfo& ni) {
+ Address = ni.Address;
+ Host = ni.Host;
+ ResolveHost = ni.ResolveHost;
+ Port = ni.Port;
+ Location = ni.Location;
+ return *this;
+ }
+ };
+
+ TMap<ui32, TNodeInfo> StaticNodeTable;
+
+ bool IsEntriesUnique() const;
};
- struct TNodeRegistrarSetup {
+ struct TNodeRegistrarSetup {
TActorId ServiceID;
- TIntrusivePtr<TInterconnectGlobalState> GlobalState;
- };
+ TIntrusivePtr<TInterconnectGlobalState> GlobalState;
+ };
TActorId GetNameserviceActorId();
@@ -129,9 +129,9 @@ namespace NActors {
* Const table-lookup based name service
*/
- IActor* CreateNameserverTable(
- const TIntrusivePtr<TTableNameserverSetup>& setup,
- ui32 poolId = 0);
+ IActor* CreateNameserverTable(
+ const TIntrusivePtr<TTableNameserverSetup>& setup,
+ ui32 poolId = 0);
/**
* Name service which can be paired with external discovery service.
diff --git a/library/cpp/actors/interconnect/interconnect_address.cpp b/library/cpp/actors/interconnect/interconnect_address.cpp
index 8dff6ecb42..8f474f5a39 100644
--- a/library/cpp/actors/interconnect/interconnect_address.cpp
+++ b/library/cpp/actors/interconnect/interconnect_address.cpp
@@ -9,9 +9,9 @@
#endif
namespace NInterconnect {
- TAddress::TAddress() {
+ TAddress::TAddress() {
memset(&Addr, 0, sizeof(Addr));
- }
+ }
TAddress::TAddress(NAddr::IRemoteAddr& addr) {
socklen_t len = addr.Len();
@@ -19,42 +19,42 @@ namespace NInterconnect {
memcpy(&Addr.Generic, addr.Addr(), len);
}
- int TAddress::GetFamily() const {
+ int TAddress::GetFamily() const {
return Addr.Generic.sa_family;
- }
+ }
- socklen_t TAddress::Size() const {
+ socklen_t TAddress::Size() const {
switch (Addr.Generic.sa_family) {
- case AF_INET6:
+ case AF_INET6:
return sizeof(sockaddr_in6);
- case AF_INET:
+ case AF_INET:
return sizeof(sockaddr_in);
- default:
- return 0;
- }
- }
+ default:
+ return 0;
+ }
+ }
- sockaddr* TAddress::SockAddr() {
+ sockaddr* TAddress::SockAddr() {
return &Addr.Generic;
}
- const sockaddr* TAddress::SockAddr() const {
+ const sockaddr* TAddress::SockAddr() const {
return &Addr.Generic;
- }
+ }
- ui16 TAddress::GetPort() const {
+ ui16 TAddress::GetPort() const {
switch (Addr.Generic.sa_family) {
- case AF_INET6:
+ case AF_INET6:
return ntohs(Addr.Ipv6.sin6_port);
- case AF_INET:
+ case AF_INET:
return ntohs(Addr.Ipv4.sin_port);
- default:
- return 0;
- }
- }
+ default:
+ return 0;
+ }
+ }
- TString TAddress::ToString() const {
- return GetAddress() + ":" + ::ToString(GetPort());
+ TString TAddress::ToString() const {
+ return GetAddress() + ":" + ::ToString(GetPort());
}
TAddress::TAddress(const char* addr, ui16 port) {
@@ -63,8 +63,8 @@ namespace NInterconnect {
Addr.Ipv6.sin6_port = htons(port);
} else if (inet_pton(Addr.Ipv4.sin_family = AF_INET, addr, &Addr.Ipv4.sin_addr)) {
Addr.Ipv4.sin_port = htons(port);
- }
- }
+ }
+ }
TAddress::TAddress(const TString& addr, ui16 port)
: TAddress(addr.data(), port)
@@ -75,17 +75,17 @@ namespace NInterconnect {
socklen_t size;
switch (Addr.Generic.sa_family) {
- case AF_INET6:
+ case AF_INET6:
std::tie(src, size) = std::make_tuple(&Addr.Ipv6.sin6_addr, INET6_ADDRSTRLEN);
break;
- case AF_INET:
+ case AF_INET:
std::tie(src, size) = std::make_tuple(&Addr.Ipv4.sin_addr, INET_ADDRSTRLEN);
break;
- default:
- return TString();
- }
+ default:
+ return TString();
+ }
char *buffer = static_cast<char*>(alloca(size));
const char *p = inet_ntop(Addr.Generic.sa_family, const_cast<void*>(src), buffer, size);
diff --git a/library/cpp/actors/interconnect/interconnect_address.h b/library/cpp/actors/interconnect/interconnect_address.h
index 9c01381ce3..e9e0faec81 100644
--- a/library/cpp/actors/interconnect/interconnect_address.h
+++ b/library/cpp/actors/interconnect/interconnect_address.h
@@ -6,24 +6,24 @@
#include <util/generic/string.h>
namespace NInterconnect {
- class TAddress {
+ class TAddress {
union {
sockaddr Generic;
sockaddr_in Ipv4;
sockaddr_in6 Ipv6;
} Addr;
- public:
- TAddress();
- TAddress(const char* addr, ui16 port);
+ public:
+ TAddress();
+ TAddress(const char* addr, ui16 port);
TAddress(const TString& addr, ui16 port);
TAddress(NAddr::IRemoteAddr& addr);
- int GetFamily() const;
- socklen_t Size() const;
- ::sockaddr* SockAddr();
- const ::sockaddr* SockAddr() const;
- ui16 GetPort() const;
- TString GetAddress() const;
- TString ToString() const;
+ int GetFamily() const;
+ socklen_t Size() const;
+ ::sockaddr* SockAddr();
+ const ::sockaddr* SockAddr() const;
+ ui16 GetPort() const;
+ TString GetAddress() const;
+ TString ToString() const;
};
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index bb54667af6..a66ba2a154 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -11,11 +11,11 @@
LWTRACE_USING(ACTORLIB_PROVIDER);
namespace NActors {
- DECLARE_WILSON_EVENT(EventSentToSocket);
- DECLARE_WILSON_EVENT(EventReceivedFromSocket);
+ DECLARE_WILSON_EVENT(EventSentToSocket);
+ DECLARE_WILSON_EVENT(EventReceivedFromSocket);
bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
- const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr);
+ const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr);
if (task.GetVirtualFreeAmount() < amount) {
return false;
}
@@ -32,8 +32,8 @@ namespace NActors {
(ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
- part->Channel = ChannelId | TChannelPart::LastPartFlag;
- part->Size = sizeof(TEventDescr);
+ part->Channel = ChannelId | TChannelPart::LastPartFlag;
+ part->Size = sizeof(TEventDescr);
memcpy(part + 1, &event.Descr, sizeof(TEventDescr));
task.AppendBuf(part, amount);
*weightConsumed += amount;
@@ -41,13 +41,13 @@ namespace NActors {
Metrics->UpdateOutputChannelEvents(ChannelId);
return true;
- }
+ }
void TEventOutputChannel::DropConfirmed(ui64 confirm) {
LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages");
for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) {
Pool.Release(NotYetConfirmed, it++);
- }
+ }
}
bool TEventOutputChannel::FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed) {
@@ -147,7 +147,7 @@ namespace NActors {
NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue
State = EState::INITIAL;
return true; // we have processed whole event, signal to the caller
- }
+ }
}
}
@@ -169,7 +169,7 @@ namespace NActors {
Pool.Release(NotYetConfirmed);
for (auto& item : Queue) {
item.ForwardOnNondelivery(false);
- }
+ }
Pool.Release(Queue);
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index e4a4c89ba5..e4a0ae3cda 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -18,18 +18,18 @@
namespace NActors {
#pragma pack(push, 1)
- struct TChannelPart {
- ui16 Channel;
- ui16 Size;
+ struct TChannelPart {
+ ui16 Channel;
+ ui16 Size;
- static constexpr ui16 LastPartFlag = ui16(1) << 15;
+ static constexpr ui16 LastPartFlag = ui16(1) << 15;
TString ToString() const {
return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag)
<< " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false")
<< " Size# " << Size << "}";
}
- };
+ };
#pragma pack(pop)
struct TExSerializedEventTooLarge : std::exception {
@@ -41,13 +41,13 @@ namespace NActors {
};
class TEventOutputChannel : public TInterconnectLoggingBase {
- public:
+ public:
TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
: TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
- , Pool(pool)
+ , Pool(pool)
, PeerNodeId(peerNodeId)
- , ChannelId(id)
+ , ChannelId(id)
, Metrics(std::move(metrics))
, Params(std::move(params))
, MaxSerializedEventSize(maxSerializedEventSize)
@@ -61,33 +61,33 @@ namespace NActors {
const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr);
OutputQueueSize += bytes;
return std::make_pair(bytes, &event);
- }
+ }
void DropConfirmed(ui64 confirm);
bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed);
- bool IsEmpty() const {
+ bool IsEmpty() const {
return Queue.empty();
- }
+ }
bool IsWorking() const {
return !IsEmpty();
}
- ui32 GetQueueSize() const {
- return (ui32)Queue.size();
- }
+ ui32 GetQueueSize() const {
+ return (ui32)Queue.size();
+ }
ui64 GetBufferedAmountOfData() const {
- return OutputQueueSize;
- }
+ return OutputQueueSize;
+ }
void NotifyUndelivered();
- TEventHolderPool& Pool;
+ TEventHolderPool& Pool;
const ui32 PeerNodeId;
- const ui16 ChannelId;
+ const ui16 ChannelId;
std::shared_ptr<IInterconnectMetrics> Metrics;
const TSessionParams Params;
const ui32 MaxSerializedEventSize;
@@ -105,7 +105,7 @@ namespace NActors {
static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr);
- protected:
+ protected:
ui64 OutputQueueSize = 0;
std::list<TEventHolder> Queue;
@@ -120,8 +120,8 @@ namespace NActors {
if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) {
Metrics->UpdateOutputChannelTraffic(ChannelId, amount);
}
- }
+ }
- friend class TInterconnectSessionTCP;
- };
+ friend class TInterconnectSessionTCP;
+ };
}
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index 28c40f46ff..285709a00c 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -23,17 +23,17 @@ namespace NActors {
};
struct TInterconnectSettings {
- TDuration Handshake;
- TDuration DeadPeer;
- TDuration CloseOnIdle;
- ui32 SendBufferDieLimitInMB = 0;
- ui64 OutputBuffersTotalSizeLimitInMB = 0;
- ui32 TotalInflightAmountOfData = 0;
- bool MergePerPeerCounters = false;
+ TDuration Handshake;
+ TDuration DeadPeer;
+ TDuration CloseOnIdle;
+ ui32 SendBufferDieLimitInMB = 0;
+ ui64 OutputBuffersTotalSizeLimitInMB = 0;
+ ui32 TotalInflightAmountOfData = 0;
+ bool MergePerPeerCounters = false;
bool MergePerDataCenterCounters = false;
- ui32 TCPSocketBufferSize = 0;
- TDuration PingPeriod = TDuration::Seconds(3);
- TDuration ForceConfirmPeriod = TDuration::Seconds(1);
+ ui32 TCPSocketBufferSize = 0;
+ TDuration PingPeriod = TDuration::Seconds(3);
+ TDuration ForceConfirmPeriod = TDuration::Seconds(1);
TDuration LostConnection;
TDuration BatchPeriod;
bool BindOnAllAddresses = true;
@@ -54,40 +54,40 @@ namespace NActors {
}
return res;
}
- };
+ };
- struct TChannelSettings {
- ui16 Weight;
- };
+ struct TChannelSettings {
+ ui16 Weight;
+ };
- typedef TMap<ui16, TChannelSettings> TChannelsConfig;
+ typedef TMap<ui16, TChannelSettings> TChannelsConfig;
- using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title,
+ using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title,
TActorSystem* actorSystem, const TActorId& actorId)>;
using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>;
using TUpdateWhiteboardCallback = std::function<void(const TString& peer, bool connected, bool green, bool yellow,
- bool orange, bool red, TActorSystem* actorSystem)>;
+ bool orange, bool red, TActorSystem* actorSystem)>;
- struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> {
+ struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> {
TActorId NameserviceId;
- NMonitoring::TDynamicCounterPtr MonCounters;
+ NMonitoring::TDynamicCounterPtr MonCounters;
std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
TChannelsConfig ChannelsConfig;
TInterconnectSettings Settings;
- TRegisterMonPageCallback RegisterMonPage;
+ TRegisterMonPageCallback RegisterMonPage;
TActorId DestructorId;
std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize;
TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024;
- TString ClusterUUID;
- TVector<TString> AcceptUUID;
+ TString ClusterUUID;
+ TVector<TString> AcceptUUID;
ui64 StartTime = GetCycleCountFast();
- TString TechnicalSelfHostName;
+ TString TechnicalSelfHostName;
TInitWhiteboardCallback InitWhiteboard;
- TUpdateWhiteboardCallback UpdateWhiteboard;
- ui32 HandshakeBallastSize = 0;
- TAtomic StartedSessionKiller = 0;
+ TUpdateWhiteboardCallback UpdateWhiteboard;
+ ui32 HandshakeBallastSize = 0;
+ TAtomic StartedSessionKiller = 0;
TScopeId LocalScopeId;
std::shared_ptr<TEventFilter> EventFilter;
TString Cookie; // unique random identifier of a node instance (generated randomly at every start)
@@ -101,6 +101,6 @@ namespace NActors {
TMaybe<TVersionInfo> VersionInfo;
using TPtr = TIntrusivePtr<TInterconnectProxyCommon>;
- };
+ };
-}
+}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 705d6f90af..9ede998d8e 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -62,15 +62,15 @@ namespace NActors {
ui32 Checksum;
ui32 Size;
- ui32 CalculateChecksum(const void* data, size_t len) const {
+ ui32 CalculateChecksum(const void* data, size_t len) const {
return Crc32cExtendMSanCompatible(Crc32cExtendMSanCompatible(0, &Size, sizeof(Size)), data, len);
}
- void Sign(const void* data, size_t len) {
+ void Sign(const void* data, size_t len) {
Checksum = CalculateChecksum(data, len);
}
- bool Check(const void* data, size_t len) const {
+ bool Check(const void* data, size_t len) const {
return Checksum == CalculateChecksum(data, len);
}
};
@@ -474,7 +474,7 @@ namespace NActors {
if (const ui32 size = Common->HandshakeBallastSize) {
TString ballast(size, 0);
- char* data = ballast.Detach();
+ char* data = ballast.Detach();
for (ui32 i = 0; i < size; ++i) {
data[i] = i;
}
@@ -721,7 +721,7 @@ namespace NActors {
}
}
- template <typename T>
+ template <typename T>
void SendExBlock(const T& proto, const char* what) {
TString data;
Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&data);
@@ -760,7 +760,7 @@ namespace NActors {
Send(GetActorSystem()->InterconnectProxy(PeerNodeId), ev.Release());
}
- template <typename TEvent>
+ template <typename TEvent>
THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
State = std::move(state);
return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline);
@@ -778,7 +778,7 @@ namespace NActors {
return WaitForSpecificEvent<TEvent>(std::move(state));
}
- template <typename T1, typename T2, typename... TOther>
+ template <typename T1, typename T2, typename... TOther>
THolder<IEventHandle> AskProxy(THolder<IEventBase> ev, TString state) {
SendToProxy(std::move(ev));
return WaitForSpecificEvent<T1, T2, TOther...>(std::move(state));
@@ -930,7 +930,7 @@ namespace NActors {
template <typename TDataPtr, typename TSendRecvFunc>
void Process(TDataPtr buffer, size_t len, TSendRecvFunc&& sendRecv, bool read, bool write, TString state) {
Y_VERIFY(Socket);
- NInterconnect::TStreamSocket* sock = Socket.Get();
+ NInterconnect::TStreamSocket* sock = Socket.Get();
ssize_t (NInterconnect::TStreamSocket::*pfn)(TDataPtr, size_t, TString*) const = sendRecv;
size_t processed = 0;
@@ -971,7 +971,7 @@ namespace NActors {
return std::move(response->Get()->Node);
}
- template <typename T>
+ template <typename T>
static THolder<TProgramInfo> GetProgramInfo(const T& proto) {
auto programInfo = MakeHolder<TProgramInfo>();
programInfo->PID = proto.GetProgramPID();
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h
index 1b957570ad..b3c0db6c5d 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.h
+++ b/library/cpp/actors/interconnect/interconnect_handshake.h
@@ -10,10 +10,10 @@
#include "events_local.h"
namespace NActors {
- static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1);
- static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2;
+ static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1);
+ static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2;
- using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>;
+ using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>;
IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
diff --git a/library/cpp/actors/interconnect/interconnect_impl.h b/library/cpp/actors/interconnect/interconnect_impl.h
index 716bef412e..ee29e4d397 100644
--- a/library/cpp/actors/interconnect/interconnect_impl.h
+++ b/library/cpp/actors/interconnect/interconnect_impl.h
@@ -1,5 +1,5 @@
#pragma once
-
+
#include "interconnect.h"
#include <library/cpp/actors/protos/interconnect.pb.h>
#include <library/cpp/actors/core/event_pb.h>
@@ -7,39 +7,39 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NActors {
- // resolve node info
- struct TEvInterconnect::TEvResolveNode: public TEventPB<TEvInterconnect::TEvResolveNode, NActorsInterconnect::TEvResolveNode, TEvInterconnect::EvResolveNode> {
- TEvResolveNode() {
- }
+ // resolve node info
+ struct TEvInterconnect::TEvResolveNode: public TEventPB<TEvInterconnect::TEvResolveNode, NActorsInterconnect::TEvResolveNode, TEvInterconnect::EvResolveNode> {
+ TEvResolveNode() {
+ }
TEvResolveNode(ui32 nodeId, TInstant deadline = TInstant::Max()) {
- Record.SetNodeId(nodeId);
+ Record.SetNodeId(nodeId);
if (deadline != TInstant::Max()) {
Record.SetDeadline(deadline.GetValue());
}
- }
- };
-
- // node info
- struct TEvInterconnect::TEvNodeAddress: public TEventPB<TEvInterconnect::TEvNodeAddress, NActorsInterconnect::TEvNodeInfo, TEvInterconnect::EvNodeAddress> {
- TEvNodeAddress() {
- }
-
- TEvNodeAddress(ui32 nodeId) {
- Record.SetNodeId(nodeId);
- }
- };
-
- // register node
- struct TEvInterconnect::TEvRegisterNode: public TEventBase<TEvInterconnect::TEvRegisterNode, TEvInterconnect::EvRegisterNode> {
- };
-
- // reply on register node
- struct TEvInterconnect::TEvRegisterNodeResult: public TEventBase<TEvInterconnect::TEvRegisterNodeResult, TEvInterconnect::EvRegisterNodeResult> {
- };
-
- // disconnect
- struct TEvInterconnect::TEvDisconnect: public TEventLocal<TEvInterconnect::TEvDisconnect, TEvInterconnect::EvDisconnect> {
- };
+ }
+ };
+
+ // node info
+ struct TEvInterconnect::TEvNodeAddress: public TEventPB<TEvInterconnect::TEvNodeAddress, NActorsInterconnect::TEvNodeInfo, TEvInterconnect::EvNodeAddress> {
+ TEvNodeAddress() {
+ }
+
+ TEvNodeAddress(ui32 nodeId) {
+ Record.SetNodeId(nodeId);
+ }
+ };
+
+ // register node
+ struct TEvInterconnect::TEvRegisterNode: public TEventBase<TEvInterconnect::TEvRegisterNode, TEvInterconnect::EvRegisterNode> {
+ };
+
+ // reply on register node
+ struct TEvInterconnect::TEvRegisterNodeResult: public TEventBase<TEvInterconnect::TEvRegisterNodeResult, TEvInterconnect::EvRegisterNodeResult> {
+ };
+
+ // disconnect
+ struct TEvInterconnect::TEvDisconnect: public TEventLocal<TEvInterconnect::TEvDisconnect, TEvInterconnect::EvDisconnect> {
+ };
}
diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
index a2071bbc80..43419bf70d 100644
--- a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
+++ b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
@@ -10,57 +10,57 @@
namespace NActors {
class TInterconnectNameserverTable: public TInterconnectNameserverBase<TInterconnectNameserverTable> {
- TIntrusivePtr<TTableNameserverSetup> Config;
+ TIntrusivePtr<TTableNameserverSetup> Config;
- public:
- static constexpr EActivityType ActorActivityType() {
+ public:
+ static constexpr EActivityType ActorActivityType() {
return NAMESERVICE;
- }
+ }
TInterconnectNameserverTable(const TIntrusivePtr<TTableNameserverSetup>& setup, ui32 /*resolvePoolId*/)
: TInterconnectNameserverBase<TInterconnectNameserverTable>(&TInterconnectNameserverTable::StateFunc, setup->StaticNodeTable)
- , Config(setup)
- {
- Y_VERIFY(Config->IsEntriesUnique());
- }
+ , Config(setup)
+ {
+ Y_VERIFY(Config->IsEntriesUnique());
+ }
- STFUNC(StateFunc) {
- try {
+ STFUNC(StateFunc) {
+ try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvInterconnect::TEvResolveNode, Handle);
HFunc(TEvResolveAddress, Handle);
HFunc(TEvInterconnect::TEvListNodes, Handle);
HFunc(TEvInterconnect::TEvGetNode, Handle);
- }
- } catch (...) {
- // on error - do nothing
+ }
+ } catch (...) {
+ // on error - do nothing
}
}
- };
+ };
IActor* CreateNameserverTable(const TIntrusivePtr<TTableNameserverSetup>& setup, ui32 poolId) {
- return new TInterconnectNameserverTable(setup, poolId);
+ return new TInterconnectNameserverTable(setup, poolId);
}
- bool TTableNameserverSetup::IsEntriesUnique() const {
- TVector<const TNodeInfo*> infos;
- infos.reserve(StaticNodeTable.size());
- for (const auto& x : StaticNodeTable)
- infos.push_back(&x.second);
+ bool TTableNameserverSetup::IsEntriesUnique() const {
+ TVector<const TNodeInfo*> infos;
+ infos.reserve(StaticNodeTable.size());
+ for (const auto& x : StaticNodeTable)
+ infos.push_back(&x.second);
auto CompareAddressLambda =
- [](const TNodeInfo* left, const TNodeInfo* right) {
- return left->Port == right->Port ? left->Address < right->Address : left->Port < right->Port;
- };
+ [](const TNodeInfo* left, const TNodeInfo* right) {
+ return left->Port == right->Port ? left->Address < right->Address : left->Port < right->Port;
+ };
Sort(infos, CompareAddressLambda);
- for (ui32 idx = 1, end = StaticNodeTable.size(); idx < end; ++idx) {
- const TNodeInfo* left = infos[idx - 1];
- const TNodeInfo* right = infos[idx];
+ for (ui32 idx = 1, end = StaticNodeTable.size(); idx < end; ++idx) {
+ const TNodeInfo* left = infos[idx - 1];
+ const TNodeInfo* right = infos[idx];
if (left->Address && left->Address == right->Address && left->Port == right->Port)
- return false;
- }
+ return false;
+ }
auto CompareHostLambda =
[](const TNodeInfo* left, const TNodeInfo* right) {
@@ -76,8 +76,8 @@ namespace NActors {
return false;
}
- return true;
- }
+ return true;
+ }
TActorId GetNameserviceActorId() {
return TActorId(0, "namesvc");
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp
index 52e4491cec..158ebc9e1d 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.cpp
+++ b/library/cpp/actors/interconnect/interconnect_stream.cpp
@@ -23,74 +23,74 @@
#include <cerrno>
namespace NInterconnect {
- namespace {
- inline int
- LastSocketError() {
+ namespace {
+ inline int
+ LastSocketError() {
#if defined(_win_)
- return WSAGetLastError();
+ return WSAGetLastError();
#else
- return errno;
+ return errno;
#endif
- }
- }
-
- TSocket::TSocket(SOCKET fd)
- : Descriptor(fd)
- {
- }
-
- TSocket::~TSocket() {
- if (Descriptor == INVALID_SOCKET) {
- return;
- }
-
- auto const result = ::closesocket(Descriptor);
- if (result == 0)
- return;
- switch (LastSocketError()) {
- case EBADF:
- Y_FAIL("Close bad descriptor");
- case EINTR:
- break;
- case EIO:
- Y_FAIL("EIO");
- default:
- Y_FAIL("It's something unexpected");
- }
- }
-
- int TSocket::GetDescriptor() {
- return Descriptor;
- }
-
- int
- TSocket::Bind(const TAddress& addr) const {
- const auto ret = ::bind(Descriptor, addr.SockAddr(), addr.Size());
- if (ret < 0)
- return -LastSocketError();
-
- return 0;
- }
-
- int
- TSocket::Shutdown(int how) const {
- const auto ret = ::shutdown(Descriptor, how);
- if (ret < 0)
- return -LastSocketError();
-
- return 0;
- }
-
- int TSocket::GetConnectStatus() const {
- int err = 0;
- socklen_t len = sizeof(err);
- if (getsockopt(Descriptor, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&err), &len) == -1) {
- err = LastSocketError();
- }
- return err;
- }
-
- /////////////////////////////////////////////////////////////////
+ }
+ }
+
+ TSocket::TSocket(SOCKET fd)
+ : Descriptor(fd)
+ {
+ }
+
+ TSocket::~TSocket() {
+ if (Descriptor == INVALID_SOCKET) {
+ return;
+ }
+
+ auto const result = ::closesocket(Descriptor);
+ if (result == 0)
+ return;
+ switch (LastSocketError()) {
+ case EBADF:
+ Y_FAIL("Close bad descriptor");
+ case EINTR:
+ break;
+ case EIO:
+ Y_FAIL("EIO");
+ default:
+ Y_FAIL("It's something unexpected");
+ }
+ }
+
+ int TSocket::GetDescriptor() {
+ return Descriptor;
+ }
+
+ int
+ TSocket::Bind(const TAddress& addr) const {
+ const auto ret = ::bind(Descriptor, addr.SockAddr(), addr.Size());
+ if (ret < 0)
+ return -LastSocketError();
+
+ return 0;
+ }
+
+ int
+ TSocket::Shutdown(int how) const {
+ const auto ret = ::shutdown(Descriptor, how);
+ if (ret < 0)
+ return -LastSocketError();
+
+ return 0;
+ }
+
+ int TSocket::GetConnectStatus() const {
+ int err = 0;
+ socklen_t len = sizeof(err);
+ if (getsockopt(Descriptor, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&err), &len) == -1) {
+ err = LastSocketError();
+ }
+ return err;
+ }
+
+ /////////////////////////////////////////////////////////////////
TIntrusivePtr<TStreamSocket> TStreamSocket::Make(int domain) {
const SOCKET res = ::socket(domain, SOCK_STREAM | SOCK_NONBLOCK, 0);
@@ -99,106 +99,106 @@ namespace NInterconnect {
Y_VERIFY(err != EMFILE && err != ENFILE);
}
return MakeIntrusive<TStreamSocket>(res);
- }
+ }
- TStreamSocket::TStreamSocket(SOCKET fd)
- : TSocket(fd)
- {
- }
+ TStreamSocket::TStreamSocket(SOCKET fd)
+ : TSocket(fd)
+ {
+ }
- ssize_t
+ ssize_t
TStreamSocket::Send(const void* msg, size_t len, TString* /*err*/) const {
const auto ret = ::send(Descriptor, static_cast<const char*>(msg), int(len), 0);
- if (ret < 0)
- return -LastSocketError();
+ if (ret < 0)
+ return -LastSocketError();
- return ret;
- }
+ return ret;
+ }
- ssize_t
+ ssize_t
TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const {
const auto ret = ::recv(Descriptor, static_cast<char*>(buf), int(len), 0);
- if (ret < 0)
- return -LastSocketError();
+ if (ret < 0)
+ return -LastSocketError();
- return ret;
- }
+ return ret;
+ }
- ssize_t
- TStreamSocket::WriteV(const struct iovec* iov, int iovcnt) const {
+ ssize_t
+ TStreamSocket::WriteV(const struct iovec* iov, int iovcnt) const {
#ifndef _win_
- const auto ret = ::writev(Descriptor, iov, iovcnt);
- if (ret < 0)
- return -LastSocketError();
- return ret;
+ const auto ret = ::writev(Descriptor, iov, iovcnt);
+ if (ret < 0)
+ return -LastSocketError();
+ return ret;
#else
Y_FAIL("WriteV() unsupported on Windows");
#endif
- }
+ }
- ssize_t
- TStreamSocket::ReadV(const struct iovec* iov, int iovcnt) const {
+ ssize_t
+ TStreamSocket::ReadV(const struct iovec* iov, int iovcnt) const {
#ifndef _win_
- const auto ret = ::readv(Descriptor, iov, iovcnt);
- if (ret < 0)
- return -LastSocketError();
- return ret;
+ const auto ret = ::readv(Descriptor, iov, iovcnt);
+ if (ret < 0)
+ return -LastSocketError();
+ return ret;
#else
Y_FAIL("ReadV() unsupported on Windows");
#endif
- }
+ }
- ssize_t TStreamSocket::GetUnsentQueueSize() const {
- int num = -1;
+ ssize_t TStreamSocket::GetUnsentQueueSize() const {
+ int num = -1;
#ifndef _win_ // we have no means to determine output queue size on Windows
- if (ioctl(Descriptor, TIOCOUTQ, &num) == -1) {
- num = -1;
- }
-#endif
- return num;
- }
-
- int
- TStreamSocket::Connect(const TAddress& addr) const {
- const auto ret = ::connect(Descriptor, addr.SockAddr(), addr.Size());
- if (ret < 0)
- return -LastSocketError();
-
- return ret;
- }
-
- int
- TStreamSocket::Connect(const NAddr::IRemoteAddr* addr) const {
- const auto ret = ::connect(Descriptor, addr->Addr(), addr->Len());
- if (ret < 0)
- return -LastSocketError();
-
- return ret;
- }
-
- int
- TStreamSocket::Listen(int backlog) const {
- const auto ret = ::listen(Descriptor, backlog);
- if (ret < 0)
- return -LastSocketError();
-
- return ret;
- }
-
- int
- TStreamSocket::Accept(TAddress& acceptedAddr) const {
- socklen_t acceptedSize = sizeof(::sockaddr_in6);
- const auto ret = ::accept(Descriptor, acceptedAddr.SockAddr(), &acceptedSize);
- if (ret == INVALID_SOCKET)
- return -LastSocketError();
-
- return ret;
- }
-
- void
- TStreamSocket::SetSendBufferSize(i32 len) const {
- (void)SetSockOpt(Descriptor, SOL_SOCKET, SO_SNDBUF, len);
- }
+ if (ioctl(Descriptor, TIOCOUTQ, &num) == -1) {
+ num = -1;
+ }
+#endif
+ return num;
+ }
+
+ int
+ TStreamSocket::Connect(const TAddress& addr) const {
+ const auto ret = ::connect(Descriptor, addr.SockAddr(), addr.Size());
+ if (ret < 0)
+ return -LastSocketError();
+
+ return ret;
+ }
+
+ int
+ TStreamSocket::Connect(const NAddr::IRemoteAddr* addr) const {
+ const auto ret = ::connect(Descriptor, addr->Addr(), addr->Len());
+ if (ret < 0)
+ return -LastSocketError();
+
+ return ret;
+ }
+
+ int
+ TStreamSocket::Listen(int backlog) const {
+ const auto ret = ::listen(Descriptor, backlog);
+ if (ret < 0)
+ return -LastSocketError();
+
+ return ret;
+ }
+
+ int
+ TStreamSocket::Accept(TAddress& acceptedAddr) const {
+ socklen_t acceptedSize = sizeof(::sockaddr_in6);
+ const auto ret = ::accept(Descriptor, acceptedAddr.SockAddr(), &acceptedSize);
+ if (ret == INVALID_SOCKET)
+ return -LastSocketError();
+
+ return ret;
+ }
+
+ void
+ TStreamSocket::SetSendBufferSize(i32 len) const {
+ (void)SetSockOpt(Descriptor, SOL_SOCKET, SO_SNDBUF, len);
+ }
ui32 TStreamSocket::GetSendBufferSize() const {
ui32 res = 0;
@@ -206,7 +206,7 @@ namespace NInterconnect {
return res;
}
- //////////////////////////////////////////////////////
+ //////////////////////////////////////////////////////
TDatagramSocket::TPtr TDatagramSocket::Make(int domain) {
const SOCKET res = ::socket(domain, SOCK_DGRAM, 0);
@@ -215,31 +215,31 @@ namespace NInterconnect {
Y_VERIFY(err != EMFILE && err != ENFILE);
}
return std::make_shared<TDatagramSocket>(res);
- }
-
- TDatagramSocket::TDatagramSocket(SOCKET fd)
- : TSocket(fd)
- {
- }
-
- ssize_t
- TDatagramSocket::SendTo(const void* msg, size_t len, const TAddress& toAddr) const {
- const auto ret = ::sendto(Descriptor, static_cast<const char*>(msg), int(len), 0, toAddr.SockAddr(), toAddr.Size());
- if (ret < 0)
- return -LastSocketError();
-
- return ret;
- }
-
- ssize_t
- TDatagramSocket::RecvFrom(void* buf, size_t len, TAddress& fromAddr) const {
- socklen_t fromSize = sizeof(::sockaddr_in6);
- const auto ret = ::recvfrom(Descriptor, static_cast<char*>(buf), int(len), 0, fromAddr.SockAddr(), &fromSize);
- if (ret < 0)
- return -LastSocketError();
-
- return ret;
- }
+ }
+
+ TDatagramSocket::TDatagramSocket(SOCKET fd)
+ : TSocket(fd)
+ {
+ }
+
+ ssize_t
+ TDatagramSocket::SendTo(const void* msg, size_t len, const TAddress& toAddr) const {
+ const auto ret = ::sendto(Descriptor, static_cast<const char*>(msg), int(len), 0, toAddr.SockAddr(), toAddr.Size());
+ if (ret < 0)
+ return -LastSocketError();
+
+ return ret;
+ }
+
+ ssize_t
+ TDatagramSocket::RecvFrom(void* buf, size_t len, TAddress& fromAddr) const {
+ socklen_t fromSize = sizeof(::sockaddr_in6);
+ const auto ret = ::recvfrom(Descriptor, static_cast<char*>(buf), int(len), 0, fromAddr.SockAddr(), &fromSize);
+ if (ret < 0)
+ return -LastSocketError();
+
+ return ret;
+ }
// deleter for SSL objects
diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h
index af7176b5f0..074adc6e74 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.h
+++ b/library/cpp/actors/interconnect/interconnect_stream.h
@@ -15,15 +15,15 @@
#include <sys/uio.h>
namespace NInterconnect {
- class TSocket: public NActors::TSharedDescriptor, public TNonCopyable {
- protected:
- TSocket(SOCKET fd);
+ class TSocket: public NActors::TSharedDescriptor, public TNonCopyable {
+ protected:
+ TSocket(SOCKET fd);
- virtual ~TSocket() override;
+ virtual ~TSocket() override;
SOCKET Descriptor;
- virtual int GetDescriptor() override;
+ virtual int GetDescriptor() override;
private:
friend class TSecureSocket;
@@ -32,21 +32,21 @@ namespace NInterconnect {
return std::exchange(Descriptor, INVALID_SOCKET);
}
- public:
- operator SOCKET() const {
- return Descriptor;
- }
+ public:
+ operator SOCKET() const {
+ return Descriptor;
+ }
- int Bind(const TAddress& addr) const;
- int Shutdown(int how) const;
- int GetConnectStatus() const;
- };
+ int Bind(const TAddress& addr) const;
+ int Shutdown(int how) const;
+ int GetConnectStatus() const;
+ };
- class TStreamSocket: public TSocket {
- public:
- TStreamSocket(SOCKET fd);
+ class TStreamSocket: public TSocket {
+ public:
+ TStreamSocket(SOCKET fd);
- static TIntrusivePtr<TStreamSocket> Make(int domain);
+ static TIntrusivePtr<TStreamSocket> Make(int domain);
virtual ssize_t Send(const void* msg, size_t len, TString *err = nullptr) const;
virtual ssize_t Recv(void* buf, size_t len, TString *err = nullptr) const;
@@ -54,16 +54,16 @@ namespace NInterconnect {
virtual ssize_t WriteV(const struct iovec* iov, int iovcnt) const;
virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const;
- int Connect(const TAddress& addr) const;
- int Connect(const NAddr::IRemoteAddr* addr) const;
- int Listen(int backlog) const;
- int Accept(TAddress& acceptedAddr) const;
+ int Connect(const TAddress& addr) const;
+ int Connect(const NAddr::IRemoteAddr* addr) const;
+ int Listen(int backlog) const;
+ int Accept(TAddress& acceptedAddr) const;
- ssize_t GetUnsentQueueSize() const;
+ ssize_t GetUnsentQueueSize() const;
- void SetSendBufferSize(i32 len) const;
+ void SetSendBufferSize(i32 len) const;
ui32 GetSendBufferSize() const;
- };
+ };
class TSecureSocketContext {
class TImpl;
@@ -116,16 +116,16 @@ namespace NInterconnect {
bool WantWrite() const;
};
- class TDatagramSocket: public TSocket {
- public:
- typedef std::shared_ptr<TDatagramSocket> TPtr;
+ class TDatagramSocket: public TSocket {
+ public:
+ typedef std::shared_ptr<TDatagramSocket> TPtr;
- TDatagramSocket(SOCKET fd);
+ TDatagramSocket(SOCKET fd);
- static TPtr Make(int domain);
+ static TPtr Make(int domain);
- ssize_t SendTo(const void* msg, size_t len, const TAddress& toAddr) const;
- ssize_t RecvFrom(void* buf, size_t len, TAddress& fromAddr) const;
- };
+ ssize_t SendTo(const void* msg, size_t len, const TAddress& toAddr) const;
+ ssize_t RecvFrom(void* buf, size_t len, TAddress& fromAddr) const;
+ };
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 19a1ed3f57..0abe9fe659 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -339,7 +339,7 @@ namespace NActors {
PreallocateBuffers();
TStackVec<TIoVec, NumPreallocatedBuffers> buffs;
- for (const auto& item : Buffers) {
+ for (const auto& item : Buffers) {
TIoVec iov{item->GetBuffer(), item->GetCapacity()};
buffs.push_back(iov);
if (Params.Encryption) {
@@ -347,7 +347,7 @@ namespace NActors {
}
}
- const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data());
+ const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data());
int iovcnt = buffs.size();
ssize_t recvres = 0;
@@ -473,4 +473,4 @@ namespace NActors {
}
-}
+}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 65c8ae6fa5..7e2d8ccb94 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -7,17 +7,17 @@
#include <util/system/getpid.h>
namespace NActors {
- static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5);
+ static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5);
- static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
- static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10);
- static constexpr ui32 SleepRetryMultiplier = 4;
+ static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
+ static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10);
+ static constexpr ui32 SleepRetryMultiplier = 4;
- static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) {
- TStringBuf token;
- TStringBuf(longName).NextTok('.', token);
- return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port);
- }
+ static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) {
+ TStringBuf token;
+ TStringBuf(longName).NextTok('.', token);
+ return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port);
+ }
TInterconnectProxyTCP::TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common,
IActor **dynamicPtr)
@@ -27,14 +27,14 @@ namespace NActors {
, Common(std::move(common))
, SecureContext(new NInterconnect::TSecureSocketContext(Common->Settings.Certificate, Common->Settings.PrivateKey,
Common->Settings.CaFilePath, Common->Settings.CipherList))
- {
+ {
Y_VERIFY(Common);
Y_VERIFY(Common->NameserviceId);
if (DynamicPtr) {
Y_VERIFY(!*DynamicPtr);
*DynamicPtr = this;
}
- }
+ }
void TInterconnectProxyTCP::Bootstrap() {
SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId));
@@ -54,20 +54,20 @@ namespace NActors {
TString path = Sprintf("peer%04" PRIu32, PeerNodeId);
TString title = Sprintf("Peer #%04" PRIu32, PeerNodeId);
mon(path, title, sys, SelfId());
- }
+ }
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingActivation
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingActivation
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void TInterconnectProxyTCP::RequestNodeInfo(STATEFN_SIG) {
ICPROXY_PROFILED;
- Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents);
+ Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents);
EnqueueSessionEvent(ev);
StartConfiguring();
- }
+ }
void TInterconnectProxyTCP::RequestNodeInfoForIncomingHandshake(STATEFN_SIG) {
ICPROXY_PROFILED;
@@ -77,37 +77,37 @@ namespace NActors {
EnqueueIncomingHandshakeEvent(ev);
StartConfiguring();
}
- }
+ }
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingNodeInfo
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingNodeInfo
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void TInterconnectProxyTCP::StartConfiguring() {
ICPROXY_PROFILED;
- Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
+ Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
- // issue node info request
+ // issue node info request
Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId));
- // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other
- // wakeup events in flight
+ // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other
+ // wakeup events in flight
SwitchToState(__LINE__, "PendingNodeInfo", &TThis::PendingNodeInfo, GetNodeRequestTimeout,
- ConfigureTimeoutCookie = new TEvents::TEvWakeup);
- }
+ ConfigureTimeoutCookie = new TEvents::TEvWakeup);
+ }
void TInterconnectProxyTCP::Configure(TEvInterconnect::TEvNodeInfo::TPtr& ev) {
ICPROXY_PROFILED;
Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !Session);
- if (!ev->Get()->Node) {
+ if (!ev->Get()->Node) {
TransitToErrorState("cannot get node info");
- } else {
- auto& info = *ev->Get()->Node;
+ } else {
+ auto& info = *ev->Get()->Node;
TString name = PeerNameForHuman(PeerNodeId, info.Host, info.Port);
- TechnicalPeerHostName = info.Host;
+ TechnicalPeerHostName = info.Host;
if (!Metrics) {
Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common);
}
@@ -122,9 +122,9 @@ namespace NActors {
void TInterconnectProxyTCP::ConfigureTimeout(TEvents::TEvWakeup::TPtr& ev) {
ICPROXY_PROFILED;
- if (ev->Get() == ConfigureTimeoutCookie) {
+ if (ev->Get() == ConfigureTimeoutCookie) {
TransitToErrorState("timed out while waiting for node info");
- }
+ }
}
void TInterconnectProxyTCP::ProcessConfigured() {
@@ -152,72 +152,72 @@ namespace NActors {
void TInterconnectProxyTCP::StartInitialHandshake() {
ICPROXY_PROFILED;
- // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any
+ // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any
DropHandshakes();
- // create and register handshake actor
+ // create and register handshake actor
OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, GenerateSessionVirtualId(),
TActorId(), PeerNodeId, 0, TechnicalPeerHostName, TSessionParams()), TMailboxType::ReadAsFilled);
OutgoingHandshakeActorCreated = TActivationContext::Now();
// prepare for new handshake
PrepareNewSessionHandshake();
- }
+ }
void TInterconnectProxyTCP::StartResumeHandshake(ui64 inputCounter) {
ICPROXY_PROFILED;
- // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful
+ // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful
DropOutgoingHandshake();
- // ensure that we have session
- Y_VERIFY(Session);
+ // ensure that we have session
+ Y_VERIFY(Session);
- // ensure that we have both virtual ids
- Y_VERIFY(SessionVirtualId);
- Y_VERIFY(RemoteSessionVirtualId);
+ // ensure that we have both virtual ids
+ Y_VERIFY(SessionVirtualId);
+ Y_VERIFY(RemoteSessionVirtualId);
- // create and register handshake actor
+ // create and register handshake actor
OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, SessionVirtualId,
RemoteSessionVirtualId, PeerNodeId, inputCounter, TechnicalPeerHostName, Session->Params),
TMailboxType::ReadAsFilled);
OutgoingHandshakeActorCreated = TActivationContext::Now();
- }
+ }
void TInterconnectProxyTCP::IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
THolder<IEventBase> event) {
ICPROXY_PROFILED;
- Y_VERIFY(!IncomingHandshakeActor);
- IncomingHandshakeActor = handshakeId;
+ Y_VERIFY(!IncomingHandshakeActor);
+ IncomingHandshakeActor = handshakeId;
IncomingHandshakeActorFilledIn = TActivationContext::Now();
Y_VERIFY(!LastSerialFromIncomingHandshake || *LastSerialFromIncomingHandshake <= peerLocalId);
LastSerialFromIncomingHandshake = peerLocalId;
if (OutgoingHandshakeActor && SelfId().NodeId() < PeerNodeId) {
- // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake
- // incoming handshake must be held till outgoing handshake is complete or failed
+ // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake
+ // incoming handshake must be held till outgoing handshake is complete or failed
LOG_DEBUG_IC("ICP06", "reply for incoming handshake (actor %s) is held", IncomingHandshakeActor.ToString().data());
- HeldHandshakeReply = std::move(event);
+ HeldHandshakeReply = std::move(event);
- // Check that we are in one of acceptable states that would properly handle handshake statuses.
- const auto state = CurrentStateFunc();
- Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State);
- } else {
- LOG_DEBUG_IC("ICP07", "issued incoming handshake reply");
+ // Check that we are in one of acceptable states that would properly handle handshake statuses.
+ const auto state = CurrentStateFunc();
+ Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State);
+ } else {
+ LOG_DEBUG_IC("ICP07", "issued incoming handshake reply");
- // No race, so we can send reply immediately.
- Y_VERIFY(!HeldHandshakeReply);
+ // No race, so we can send reply immediately.
+ Y_VERIFY(!HeldHandshakeReply);
Send(IncomingHandshakeActor, event.Release());
- // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't
- // switch from working state.
- if (!Session) {
+ // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't
+ // switch from working state.
+ if (!Session) {
LOG_INFO_IC("ICP08", "No active sessions, becoming PendingConnection");
SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection);
- } else {
- Y_VERIFY(CurrentStateFunc() == &TThis::StateWork);
- }
+ } else {
+ Y_VERIFY(CurrentStateFunc() == &TThis::StateWork);
+ }
}
}
@@ -243,8 +243,8 @@ namespace NActors {
} else {
// if we already have incoming handshake, then terminate existing one
DropIncomingHandshake();
-
- // issue reply to the sender, possibly holding it while outgoing handshake is at race
+
+ // issue reply to the sender, possibly holding it while outgoing handshake is at race
THolder<IEventBase> reply = IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::ProcessHandshakeRequest, ev);
return IssueIncomingHandshakeReply(ev->Sender, RemoteSessionVirtualId.LocalId(), std::move(reply));
}
@@ -263,27 +263,27 @@ namespace NActors {
ui64 remoteStartTime = record.GetProgramStartTime();
ui64 remoteSerial = record.GetSerial();
- if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) {
- if (remoteSerial < RemoteProgramInfo->Serial) {
+ if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) {
+ if (remoteSerial < RemoteProgramInfo->Serial) {
LOG_INFO_IC("ICP18", "handshake (actor %s) is too old", ev->Sender.ToString().data());
Send(ev->Sender, new TEvents::TEvPoisonPill);
- return;
- } else {
- RemoteProgramInfo->Serial = remoteSerial;
- }
+ return;
+ } else {
+ RemoteProgramInfo->Serial = remoteSerial;
+ }
} else {
- const auto ptr = new TProgramInfo;
- ptr->PID = remotePID;
- ptr->StartTime = remoteStartTime;
- ptr->Serial = remoteSerial;
- RemoteProgramInfo.Reset(ptr);
+ const auto ptr = new TProgramInfo;
+ ptr->PID = remotePID;
+ ptr->StartTime = remoteStartTime;
+ ptr->Serial = remoteSerial;
+ RemoteProgramInfo.Reset(ptr);
}
- /* Let's check peer technical hostname */
+ /* Let's check peer technical hostname */
if (record.HasSenderHostName() && TechnicalPeerHostName != record.GetSenderHostName()) {
Send(ev->Sender, new TEvHandshakeReplyError("host name mismatch"));
- return;
- }
+ return;
+ }
// check sender actor id and check if it is not very old
if (LastSerialFromIncomingHandshake) {
@@ -303,60 +303,60 @@ namespace NActors {
}
}
- // drop incoming handshake as this is definitely more recent
+ // drop incoming handshake as this is definitely more recent
DropIncomingHandshake();
- // prepare for new session
+ // prepare for new session
PrepareNewSessionHandshake();
- auto event = MakeHolder<TEvHandshakeReplyOK>();
- auto* pb = event->Record.MutableSuccess();
+ auto event = MakeHolder<TEvHandshakeReplyOK>();
+ auto* pb = event->Record.MutableSuccess();
const TActorId virtualId = GenerateSessionVirtualId();
- pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
+ pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
pb->SetSenderActorId(virtualId.ToString());
- pb->SetProgramPID(GetPID());
+ pb->SetProgramPID(GetPID());
pb->SetProgramStartTime(Common->StartTime);
- pb->SetSerial(virtualId.LocalId());
+ pb->SetSerial(virtualId.LocalId());
IssueIncomingHandshakeReply(ev->Sender, 0, std::move(event));
- }
+ }
void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeDone::TPtr& ev) {
ICPROXY_PROFILED;
TEvHandshakeDone *msg = ev->Get();
- // Terminate handshake actor working in opposite direction, if set up.
- if (ev->Sender == IncomingHandshakeActor) {
- LOG_INFO_IC("ICP19", "incoming handshake succeeded");
+ // Terminate handshake actor working in opposite direction, if set up.
+ if (ev->Sender == IncomingHandshakeActor) {
+ LOG_INFO_IC("ICP19", "incoming handshake succeeded");
DropIncomingHandshake(false);
DropOutgoingHandshake();
- } else if (ev->Sender == OutgoingHandshakeActor) {
- LOG_INFO_IC("ICP20", "outgoing handshake succeeded");
+ } else if (ev->Sender == OutgoingHandshakeActor) {
+ LOG_INFO_IC("ICP20", "outgoing handshake succeeded");
DropIncomingHandshake();
DropOutgoingHandshake(false);
- } else {
+ } else {
/* It seems to be an old handshake. */
- return;
+ return;
}
- Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
+ Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
SwitchToState(__LINE__, "StateWork", &TThis::StateWork);
- if (Session) {
+ if (Session) {
// this is continuation request, check that virtual ids match
Y_VERIFY(SessionVirtualId == msg->Self && RemoteSessionVirtualId == msg->Peer);
- } else {
+ } else {
// this is initial request, check that we have virtual ids not filled in
Y_VERIFY(!SessionVirtualId && !RemoteSessionVirtualId);
- }
+ }
auto error = [&](const char* description) {
TransitToErrorState(description);
};
- // If session is not created, then create new one.
- if (!Session) {
+ // If session is not created, then create new one.
+ if (!Session) {
RemoteProgramInfo = std::move(msg->ProgramInfo);
if (!RemoteProgramInfo) {
// we have received resume handshake, but session was closed concurrently while handshaking
@@ -374,15 +374,15 @@ namespace NActors {
// ensure that we have session local/peer virtual ids
Y_VERIFY(Session && SessionVirtualId && RemoteSessionVirtualId);
- // Set up new connection for the session.
+ // Set up new connection for the session.
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::SetNewConnection, ev);
- // Reset retry timer
- HoldByErrorWakeupDuration = TDuration::Zero();
+ // Reset retry timer
+ HoldByErrorWakeupDuration = TDuration::Zero();
- /* Forward all held events */
+ /* Forward all held events */
ProcessPendingSessionEvents();
- }
+ }
void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeFail::TPtr& ev) {
ICPROXY_PROFILED;
@@ -392,42 +392,42 @@ namespace NActors {
(IncomingHandshakeActor && OutgoingHandshakeActor);
LogHandshakeFail(ev, inconclusive);
- if (ev->Sender == IncomingHandshakeActor) {
- LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s",
+ if (ev->Sender == IncomingHandshakeActor) {
+ LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s",
ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), OutgoingHandshakeActor.ToString().data());
DropIncomingHandshake(false);
- } else if (ev->Sender == OutgoingHandshakeActor) {
- LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s",
+ } else if (ev->Sender == OutgoingHandshakeActor) {
+ LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s",
ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), IncomingHandshakeActor.ToString().data(),
- HeldHandshakeReply ? "yes" : "no");
+ HeldHandshakeReply ? "yes" : "no");
DropOutgoingHandshake(false);
- if (IEventBase* reply = HeldHandshakeReply.Release()) {
- Y_VERIFY(IncomingHandshakeActor);
+ if (IEventBase* reply = HeldHandshakeReply.Release()) {
+ Y_VERIFY(IncomingHandshakeActor);
LOG_DEBUG_IC("ICP26", "sent held handshake reply to %s", IncomingHandshakeActor.ToString().data());
Send(IncomingHandshakeActor, reply);
- }
+ }
// if we have no current session, then we have to drop all pending events as the outgoing handshake has failed
ProcessPendingSessionEvents();
- } else {
- /* It seems to be an old fail, just ignore it */
+ } else {
+ /* It seems to be an old fail, just ignore it */
LOG_NOTICE_IC("ICP27", "obsolete handshake fail ignored");
- return;
+ return;
}
if (Metrics) {
Metrics->IncHandshakeFails();
- }
+ }
- if (IncomingHandshakeActor || OutgoingHandshakeActor) {
- // one of handshakes is still going on
- LOG_DEBUG_IC("ICP28", "other handshake is still going on");
- return;
- }
+ if (IncomingHandshakeActor || OutgoingHandshakeActor) {
+ // one of handshakes is still going on
+ LOG_DEBUG_IC("ICP28", "other handshake is still going on");
+ return;
+ }
- switch (ev->Get()->Temporary) {
- case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT:
+ switch (ev->Get()->Temporary) {
+ case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT:
if (!Session) {
if (PendingSessionEvents) {
// try to start outgoing handshake as we have some events enqueued
@@ -444,22 +444,22 @@ namespace NActors {
// we have no active connection in that session, so just restart handshake from last known position
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::StartHandshake);
}
- break;
-
- case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH:
+ break;
+
+ case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH:
StartInitialHandshake();
- break;
-
- case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT:
+ break;
+
+ case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT:
TString timeExplanation = " LastSessionDieTime# " + LastSessionDieTime.ToString();
- if (Session) {
+ if (Session) {
InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate,
TDisconnectReason::HandshakeFailPermanent());
}
TransitToErrorState(ev->Get()->Explanation + timeExplanation, false);
- break;
- }
- }
+ break;
+ }
+ }
void TInterconnectProxyTCP::LogHandshakeFail(TEvHandshakeFail::TPtr& ev, bool inconclusive) {
ICPROXY_PROFILED;
@@ -487,69 +487,69 @@ namespace NActors {
void TInterconnectProxyTCP::ProcessPendingSessionEvents() {
ICPROXY_PROFILED;
- while (PendingSessionEvents) {
+ while (PendingSessionEvents) {
TPendingSessionEvent ev = std::move(PendingSessionEvents.front());
PendingSessionEventsSize -= ev.Size;
TAutoPtr<IEventHandle> event(ev.Event.Release());
- PendingSessionEvents.pop_front();
+ PendingSessionEvents.pop_front();
if (Session) {
ForwardSessionEventToSession(event);
- } else {
+ } else {
DropSessionEvent(event);
}
- }
+ }
}
void TInterconnectProxyTCP::DropSessionEvent(STATEFN_SIG) {
ICPROXY_PROFILED;
ValidateEvent(ev, "DropSessionEvent");
- switch (ev->GetTypeRewrite()) {
- case TEvInterconnect::EvForward:
- if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvInterconnect::EvForward:
+ if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie);
- }
+ }
TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
- break;
+ break;
- case TEvInterconnect::TEvConnectNode::EventType:
- case TEvents::TEvSubscribe::EventType:
+ case TEvInterconnect::TEvConnectNode::EventType:
+ case TEvents::TEvSubscribe::EventType:
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie);
- break;
+ break;
- case TEvents::TEvUnsubscribe::EventType:
- /* Do nothing */
- break;
+ case TEvents::TEvUnsubscribe::EventType:
+ /* Do nothing */
+ break;
- default:
- Y_FAIL("Unexpected type of event in held event queue");
- }
+ default:
+ Y_FAIL("Unexpected type of event in held event queue");
+ }
}
void TInterconnectProxyTCP::UnregisterSession(TInterconnectSessionTCP* session) {
ICPROXY_PROFILED;
- Y_VERIFY(Session && Session == session && SessionID);
+ Y_VERIFY(Session && Session == session && SessionID);
LOG_INFO_IC("ICP30", "unregister session Session# %s VirtualId# %s", SessionID.ToString().data(),
SessionVirtualId.ToString().data());
- Session = nullptr;
+ Session = nullptr;
SessionID = TActorId();
- // drop all pending events as we are closed
+ // drop all pending events as we are closed
ProcessPendingSessionEvents();
- // reset virtual ids as this session is terminated
+ // reset virtual ids as this session is terminated
SessionVirtualId = TActorId();
RemoteSessionVirtualId = TActorId();
if (Metrics) {
Metrics->IncSessionDeaths();
- }
+ }
LastSessionDieTime = TActivationContext::Now();
-
+
if (IncomingHandshakeActor || OutgoingHandshakeActor) {
PrepareNewSessionHandshake();
} else {
@@ -566,22 +566,22 @@ namespace NActors {
PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev);
ScheduleCleanupEventQueue();
CleanupEventQueue();
- }
+ }
void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(STATEFN_SIG) {
ICPROXY_PROFILED;
// enqueue handshake request
Y_UNUSED();
- PendingIncomingHandshakeEvents.emplace_back(ev);
- }
+ PendingIncomingHandshakeEvents.emplace_back(ev);
+ }
void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeDone::TPtr& /*ev*/) {
ICPROXY_PROFILED;
// TEvHandshakeDone can't get into the queue, because we have to process handshake request first; this may be the
// race with the previous handshakes, so simply ignore it
- }
+ }
void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeFail::TPtr& ev) {
ICPROXY_PROFILED;
@@ -599,140 +599,140 @@ namespace NActors {
break;
}
}
- }
+ }
void TInterconnectProxyTCP::ForwardSessionEventToSession(STATEFN_SIG) {
ICPROXY_PROFILED;
- Y_VERIFY(Session && SessionID);
+ Y_VERIFY(Session && SessionID);
ValidateEvent(ev, "ForwardSessionEventToSession");
InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID));
- }
+ }
void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) {
ICPROXY_PROFILED;
LOG_INFO_IC("ICP31", "proxy http called");
- TStringStream str;
-
- HTML(str) {
- DIV_CLASS("panel panel-info") {
- DIV_CLASS("panel-heading") {
- str << "Proxy";
- }
- DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
- TABLER() {
- TABLEH() {
- str << "Sensor";
- }
- TABLEH() {
- str << "Value";
- }
- }
- }
-#define MON_VAR(NAME) \
- TABLER() { \
- TABLED() { \
- str << #NAME; \
- } \
- TABLED() { \
- str << NAME; \
- } \
- }
-
- TABLEBODY() {
+ TStringStream str;
+
+ HTML(str) {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ str << "Proxy";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() {
+ str << "Sensor";
+ }
+ TABLEH() {
+ str << "Value";
+ }
+ }
+ }
+#define MON_VAR(NAME) \
+ TABLER() { \
+ TABLED() { \
+ str << #NAME; \
+ } \
+ TABLED() { \
+ str << NAME; \
+ } \
+ }
+
+ TABLEBODY() {
MON_VAR(TActivationContext::Now())
- MON_VAR(SessionID)
- MON_VAR(LastSessionDieTime)
- MON_VAR(IncomingHandshakeActor)
- MON_VAR(IncomingHandshakeActorFilledIn)
- MON_VAR(IncomingHandshakeActorReset)
- MON_VAR(OutgoingHandshakeActor)
- MON_VAR(OutgoingHandshakeActorCreated)
- MON_VAR(OutgoingHandshakeActorReset)
- MON_VAR(State)
- MON_VAR(StateSwitchTime)
+ MON_VAR(SessionID)
+ MON_VAR(LastSessionDieTime)
+ MON_VAR(IncomingHandshakeActor)
+ MON_VAR(IncomingHandshakeActorFilledIn)
+ MON_VAR(IncomingHandshakeActorReset)
+ MON_VAR(OutgoingHandshakeActor)
+ MON_VAR(OutgoingHandshakeActorCreated)
+ MON_VAR(OutgoingHandshakeActorReset)
+ MON_VAR(State)
+ MON_VAR(StateSwitchTime)
}
}
}
}
- DIV_CLASS("panel panel-info") {
- DIV_CLASS("panel-heading") {
- str << "Error Log";
- }
- DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ str << "Error Log";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
TABLER() {
- TABLEH() {
- str << "Timestamp";
+ TABLEH() {
+ str << "Timestamp";
}
- TABLEH() {
- str << "Elapsed";
+ TABLEH() {
+ str << "Elapsed";
}
- TABLEH() {
- str << "Kind";
+ TABLEH() {
+ str << "Kind";
}
- TABLEH() {
- str << "Explanation";
+ TABLEH() {
+ str << "Explanation";
}
}
}
- TABLEBODY() {
+ TABLEBODY() {
const TInstant now = TActivationContext::Now();
- const TInstant barrier = now - TDuration::Minutes(1);
- for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) {
- auto wrapper = [&](const auto& lambda) {
- if (std::get<0>(*it) > barrier) {
- str << "<strong>";
- lambda();
- str << "</strong>";
- } else {
- lambda();
- }
- };
- TABLER() {
- TABLED() {
- wrapper([&] {
- str << std::get<0>(*it);
- });
- }
- TABLED() {
- wrapper([&] {
- str << now - std::get<0>(*it);
- });
- }
- TABLED() {
- wrapper([&] {
- str << std::get<1>(*it);
- });
- }
- TABLED() {
- wrapper([&] {
- str << std::get<2>(*it);
- });
+ const TInstant barrier = now - TDuration::Minutes(1);
+ for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) {
+ auto wrapper = [&](const auto& lambda) {
+ if (std::get<0>(*it) > barrier) {
+ str << "<strong>";
+ lambda();
+ str << "</strong>";
+ } else {
+ lambda();
+ }
+ };
+ TABLER() {
+ TABLED() {
+ wrapper([&] {
+ str << std::get<0>(*it);
+ });
+ }
+ TABLED() {
+ wrapper([&] {
+ str << now - std::get<0>(*it);
+ });
+ }
+ TABLED() {
+ wrapper([&] {
+ str << std::get<1>(*it);
+ });
+ }
+ TABLED() {
+ wrapper([&] {
+ str << std::get<2>(*it);
+ });
ui32 rep = std::get<3>(*it);
if (rep != 1) {
str << " <strong>x" << rep << "</strong>";
}
- }
- }
- }
- }
+ }
+ }
+ }
+ }
}
}
}
}
- if (Session != nullptr) {
+ if (Session != nullptr) {
Session->GenerateHttpInfo(str);
- }
-
+ }
+
Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
}
@@ -746,16 +746,16 @@ namespace NActors {
UpdateErrorStateLog(TActivationContext::Now(), "permanent conclusive", explanation);
}
- Y_VERIFY(Session == nullptr);
- Y_VERIFY(!SessionID);
+ Y_VERIFY(Session == nullptr);
+ Y_VERIFY(!SessionID);
- // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we
- // sleep N times longer than the previous try, but not longer than desired number of seconds
- HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero()
- ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep)
- : FirstErrorSleep;
+ // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we
+ // sleep N times longer than the previous try, but not longer than desired number of seconds
+ HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero()
+ ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep)
+ : FirstErrorSleep;
- // transit to required state and arm wakeup timer
+ // transit to required state and arm wakeup timer
if (Terminated) {
// switch to this state permanently
SwitchToState(__LINE__, "HoldByError", &TThis::HoldByError);
@@ -765,21 +765,21 @@ namespace NActors {
HoldByErrorWakeupCookie = new TEvents::TEvWakeup);
}
- /* Process all pending events. */
+ /* Process all pending events. */
ProcessPendingSessionEvents();
- /* Terminate handshakes */
+ /* Terminate handshakes */
DropHandshakes();
- /* Terminate pending incoming handshake requests. */
- for (auto& ev : PendingIncomingHandshakeEvents) {
+ /* Terminate pending incoming handshake requests. */
+ for (auto& ev : PendingIncomingHandshakeEvents) {
Send(ev->Sender, new TEvents::TEvPoisonPill);
if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) {
TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release()));
LogHandshakeFail(tmp, true);
}
- }
- PendingIncomingHandshakeEvents.clear();
+ }
+ PendingIncomingHandshakeEvents.clear();
}
void TInterconnectProxyTCP::WakeupFromErrorState(TEvents::TEvWakeup::TPtr& ev) {
@@ -787,39 +787,39 @@ namespace NActors {
LOG_INFO_IC("ICP33", "wake up from error state");
- if (ev->Get() == HoldByErrorWakeupCookie) {
+ if (ev->Get() == HoldByErrorWakeupCookie) {
SwitchToInitialState();
- }
+ }
}
void TInterconnectProxyTCP::Disconnect() {
ICPROXY_PROFILED;
- // terminate handshakes (if any)
+ // terminate handshakes (if any)
DropHandshakes();
- if (Session) {
+ if (Session) {
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::UserRequest());
- } else {
+ } else {
TransitToErrorState("forced disconnect");
- }
+ }
}
void TInterconnectProxyTCP::ScheduleCleanupEventQueue() {
ICPROXY_PROFILED;
- if (!CleanupEventQueueScheduled && PendingSessionEvents) {
+ if (!CleanupEventQueueScheduled && PendingSessionEvents) {
// apply batching at 50 ms granularity
Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue);
- CleanupEventQueueScheduled = true;
- }
+ CleanupEventQueueScheduled = true;
+ }
}
void TInterconnectProxyTCP::HandleCleanupEventQueue() {
ICPROXY_PROFILED;
- Y_VERIFY(CleanupEventQueueScheduled);
- CleanupEventQueueScheduled = false;
+ Y_VERIFY(CleanupEventQueueScheduled);
+ CleanupEventQueueScheduled = false;
CleanupEventQueue();
ScheduleCleanupEventQueue();
}
@@ -838,24 +838,24 @@ namespace NActors {
} else {
break;
}
- }
+ }
}
void TInterconnectProxyTCP::HandleClosePeerSocket() {
ICPROXY_PROFILED;
- if (Session && Session->Socket) {
+ if (Session && Session->Socket) {
LOG_INFO_IC("ICP34", "closed connection by debug command");
- Session->Socket->Shutdown(SHUT_RDWR);
- }
+ Session->Socket->Shutdown(SHUT_RDWR);
+ }
}
void TInterconnectProxyTCP::HandleCloseInputSession() {
ICPROXY_PROFILED;
- if (Session) {
+ if (Session) {
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::CloseInputSession);
- }
+ }
}
void TInterconnectProxyTCP::HandlePoisonSession() {
@@ -869,11 +869,11 @@ namespace NActors {
void TInterconnectProxyTCP::HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev) {
ICPROXY_PROFILED;
- ui64 bufSize = 0;
- if (Session) {
- bufSize = Session->TotalOutputQueueSize;
- }
-
+ ui64 bufSize = 0;
+ if (Session) {
+ bufSize = Session->TotalOutputQueueSize;
+ }
+
Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize));
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 1f9ce6841c..023e5bd1ee 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -16,22 +16,22 @@
namespace NActors {
- /* WARNING: all proxy actors should be alive during actorsystem activity */
+ /* WARNING: all proxy actors should be alive during actorsystem activity */
class TInterconnectProxyTCP
: public TActor<TInterconnectProxyTCP>
, public TInterconnectLoggingBase
, public TProfiled
{
- enum {
- EvCleanupEventQueue = EventSpaceBegin(TEvents::ES_PRIVATE),
+ enum {
+ EvCleanupEventQueue = EventSpaceBegin(TEvents::ES_PRIVATE),
EvQueryStats,
EvStats,
EvPassAwayIfNeeded,
- };
+ };
- struct TEvCleanupEventQueue : TEventLocal<TEvCleanupEventQueue, EvCleanupEventQueue> {};
+ struct TEvCleanupEventQueue : TEventLocal<TEvCleanupEventQueue, EvCleanupEventQueue> {};
- public:
+ public:
struct TEvQueryStats : TEventLocal<TEvQueryStats, EvQueryStats> {};
struct TProxyStats {
@@ -56,9 +56,9 @@ namespace NActors {
TProxyStats ProxyStats;
};
- static constexpr EActivityType ActorActivityType() {
- return INTERCONNECT_PROXY_TCP;
- }
+ static constexpr EActivityType ActorActivityType() {
+ return INTERCONNECT_PROXY_TCP;
+ }
TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr);
@@ -72,11 +72,11 @@ namespace NActors {
void Bootstrap();
void Registered(TActorSystem* sys, const TActorId& owner) override;
- private:
- friend class TInterconnectSessionTCP;
- friend class TInterconnectSessionTCPv0;
- friend class THandshake;
- friend class TInputSessionTCP;
+ private:
+ friend class TInterconnectSessionTCP;
+ friend class TInterconnectSessionTCPv0;
+ friend class THandshake;
+ friend class TInputSessionTCP;
void UnregisterSession(TInterconnectSessionTCP* session);
@@ -138,46 +138,46 @@ namespace NActors {
} \
}
- template <typename T>
+ template <typename T>
void Ignore(T& /*ev*/) {
ICPROXY_PROFILED;
- }
+ }
void Ignore() {
ICPROXY_PROFILED;
- }
+ }
void Ignore(TEvHandshakeDone::TPtr& ev) {
ICPROXY_PROFILED;
- Y_VERIFY(ev->Sender != IncomingHandshakeActor);
- Y_VERIFY(ev->Sender != OutgoingHandshakeActor);
- }
+ Y_VERIFY(ev->Sender != IncomingHandshakeActor);
+ Y_VERIFY(ev->Sender != OutgoingHandshakeActor);
+ }
void Ignore(TEvHandshakeFail::TPtr& ev) {
ICPROXY_PROFILED;
- Y_VERIFY(ev->Sender != IncomingHandshakeActor);
- Y_VERIFY(ev->Sender != OutgoingHandshakeActor);
+ Y_VERIFY(ev->Sender != IncomingHandshakeActor);
+ Y_VERIFY(ev->Sender != OutgoingHandshakeActor);
LogHandshakeFail(ev, true);
- }
+ }
- const char* State = nullptr;
- TInstant StateSwitchTime;
+ const char* State = nullptr;
+ TInstant StateSwitchTime;
- template <typename... TArgs>
+ template <typename... TArgs>
void SwitchToState(int line, const char* name, TArgs&&... args) {
ICPROXY_PROFILED;
LOG_DEBUG_IC("ICP77", "@%d %s -> %s", line, State, name);
- State = name;
+ State = name;
StateSwitchTime = TActivationContext::Now();
- Become(std::forward<TArgs>(args)...);
+ Become(std::forward<TArgs>(args)...);
Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state
if (CurrentStateFunc() != &TThis::PendingActivation) {
PassAwayTimestamp = TInstant::Max();
}
- }
+ }
TInstant PassAwayTimestamp;
bool PassAwayScheduled = false;
@@ -194,7 +194,7 @@ namespace NActors {
{}, nullptr, 0));
PassAwayScheduled = true;
}
- }
+ }
void HandlePassAwayIfNeeded() {
Y_VERIFY(PassAwayScheduled);
@@ -203,92 +203,92 @@ namespace NActors {
}
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingActivation
- //
- // In this state we are just waiting for some activities, which may include:
- // * an external Session event
- // * incoming handshake request
- //
- // Upon receiving such event, we put it to corresponding queue and initiate start up by calling IssueGetNodeRequest,
- // which, as the name says, issued TEvGetNode to the nameservice and arms timer to handle timeout (which should not
- // occur, but we want to be sure we don't hang on this), and then switches to PendingNodeInfo state.
-
- PROXY_STFUNC(PendingActivation,
- RequestNodeInfo, // Session events
- RequestNodeInfoForIncomingHandshake, // Incoming handshake requests
- Ignore, // Handshake status
- Ignore, // Disconnect request
- Ignore, // Wakeup
- Ignore // Node info
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingActivation
+ //
+ // In this state we are just waiting for some activities, which may include:
+ // * an external Session event
+ // * incoming handshake request
+ //
+ // Upon receiving such event, we put it to corresponding queue and initiate start up by calling IssueGetNodeRequest,
+ // which, as the name says, issued TEvGetNode to the nameservice and arms timer to handle timeout (which should not
+ // occur, but we want to be sure we don't hang on this), and then switches to PendingNodeInfo state.
+
+ PROXY_STFUNC(PendingActivation,
+ RequestNodeInfo, // Session events
+ RequestNodeInfoForIncomingHandshake, // Incoming handshake requests
+ Ignore, // Handshake status
+ Ignore, // Disconnect request
+ Ignore, // Wakeup
+ Ignore // Node info
)
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingNodeInfo
- //
- // This state is entered when we asked nameserver to provide description for peer node we are working with. All
- // external Session events and incoming handshake requests are enqueued into their respective queues, TEvNodeInfo
- // is main event that triggers processing. On success, we try to initiate outgoing handshake if needed, or process
- // incoming handshakes. On error, we enter HoldByError state.
- //
- // NOTE: handshake status events are also enqueued as the handshake actor may have generated failure event due to
- // timeout or some other reason without waiting for acknowledge, and it must be processed correctly to prevent
- // session hang
-
- PROXY_STFUNC(PendingNodeInfo,
- EnqueueSessionEvent, // Session events
- EnqueueIncomingHandshakeEvent, // Incoming handshake requests
- EnqueueIncomingHandshakeEvent, // Handshake status
- Disconnect, // Disconnect request
- ConfigureTimeout, // Wakeup
- Configure // Node info
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingNodeInfo
+ //
+ // This state is entered when we asked nameserver to provide description for peer node we are working with. All
+ // external Session events and incoming handshake requests are enqueued into their respective queues, TEvNodeInfo
+ // is main event that triggers processing. On success, we try to initiate outgoing handshake if needed, or process
+ // incoming handshakes. On error, we enter HoldByError state.
+ //
+ // NOTE: handshake status events are also enqueued as the handshake actor may have generated failure event due to
+ // timeout or some other reason without waiting for acknowledge, and it must be processed correctly to prevent
+ // session hang
+
+ PROXY_STFUNC(PendingNodeInfo,
+ EnqueueSessionEvent, // Session events
+ EnqueueIncomingHandshakeEvent, // Incoming handshake requests
+ EnqueueIncomingHandshakeEvent, // Handshake status
+ Disconnect, // Disconnect request
+ ConfigureTimeout, // Wakeup
+ Configure // Node info
)
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingConnection
- //
- // Here we have issued outgoing handshake or have accepted (or may be both) incoming handshake and we are waiting for
- // the status of the handshake. When one if handshakes finishes, we use this status to establish connection (or to
- // go to error state). When one handshake terminates with error while other is running, we will still wait for the
- // second one to finish.
-
- PROXY_STFUNC(PendingConnection,
- EnqueueSessionEvent, // Session events
- IncomingHandshake, // Incoming handshake requests
- HandleHandshakeStatus, // Handshake status
- Disconnect, // Disconnect request
- Ignore, // Wakeup
- Ignore // Node info
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingConnection
+ //
+ // Here we have issued outgoing handshake or have accepted (or may be both) incoming handshake and we are waiting for
+ // the status of the handshake. When one if handshakes finishes, we use this status to establish connection (or to
+ // go to error state). When one handshake terminates with error while other is running, we will still wait for the
+ // second one to finish.
+
+ PROXY_STFUNC(PendingConnection,
+ EnqueueSessionEvent, // Session events
+ IncomingHandshake, // Incoming handshake requests
+ HandleHandshakeStatus, // Handshake status
+ Disconnect, // Disconnect request
+ Ignore, // Wakeup
+ Ignore // Node info
)
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // StateWork
- //
- // We have accepted session and process any incoming messages with the session. Incoming handshakes are accepted
- // concurrently and applied when finished.
-
- PROXY_STFUNC(StateWork,
- ForwardSessionEventToSession, // Session events
- IncomingHandshake, // Incoming handshake requests
- HandleHandshakeStatus, // Handshake status
- Disconnect, // Disconnect request
- Ignore, // Wakeup
- Ignore // Node info
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // StateWork
+ //
+ // We have accepted session and process any incoming messages with the session. Incoming handshakes are accepted
+ // concurrently and applied when finished.
+
+ PROXY_STFUNC(StateWork,
+ ForwardSessionEventToSession, // Session events
+ IncomingHandshake, // Incoming handshake requests
+ HandleHandshakeStatus, // Handshake status
+ Disconnect, // Disconnect request
+ Ignore, // Wakeup
+ Ignore // Node info
)
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // HoldByError
- //
- // When something bad happens with the connection, we sleep in this state. After wake up we go back to
- // PendingActivation.
-
- PROXY_STFUNC(HoldByError,
- DropSessionEvent, // Session events
- RequestNodeInfoForIncomingHandshake, // Incoming handshake requests
- Ignore, // Handshake status
- Ignore, // Disconnect request
- WakeupFromErrorState, // Wakeup
- Ignore // Node info
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // HoldByError
+ //
+ // When something bad happens with the connection, we sleep in this state. After wake up we go back to
+ // PendingActivation.
+
+ PROXY_STFUNC(HoldByError,
+ DropSessionEvent, // Session events
+ RequestNodeInfoForIncomingHandshake, // Incoming handshake requests
+ Ignore, // Handshake status
+ Ignore, // Disconnect request
+ WakeupFromErrorState, // Wakeup
+ Ignore // Node info
)
#undef SESSION_EVENTS
@@ -317,19 +317,19 @@ namespace NActors {
void StartInitialHandshake();
void StartResumeHandshake(ui64 inputCounter);
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Incoming handshake event queue processing
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Incoming handshake event queue processing
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void EnqueueIncomingHandshakeEvent(STATEFN_SIG);
void EnqueueIncomingHandshakeEvent(TEvHandshakeDone::TPtr& ev);
void EnqueueIncomingHandshakeEvent(TEvHandshakeFail::TPtr& ev);
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingNodeInfo
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingNodeInfo
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- IEventBase* ConfigureTimeoutCookie; // pointer to the scheduled event used to match sent and received events
+ IEventBase* ConfigureTimeoutCookie; // pointer to the scheduled event used to match sent and received events
void StartConfiguring();
void Configure(TEvInterconnect::TEvNodeInfo::TPtr& ev);
@@ -343,7 +343,7 @@ namespace NActors {
void WakeupFromErrorState(TEvents::TEvWakeup::TPtr& ev);
void Disconnect();
- const ui32 PeerNodeId;
+ const ui32 PeerNodeId;
IActor **DynamicPtr;
void ValidateEvent(TAutoPtr<IEventHandle>& ev, const char* func) {
@@ -362,15 +362,15 @@ namespace NActors {
}
// Common with helpers
- // All proxy actors share the same information in the object
- // read only
+ // All proxy actors share the same information in the object
+ // read only
TInterconnectProxyCommon::TPtr const Common;
const TActorId& GetNameserviceId() const {
return Common->NameserviceId;
- }
+ }
- TString TechnicalPeerHostName;
+ TString TechnicalPeerHostName;
std::shared_ptr<IInterconnectMetrics> Metrics;
@@ -380,12 +380,12 @@ namespace NActors {
void HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev);
- bool CleanupEventQueueScheduled = false;
+ bool CleanupEventQueueScheduled = false;
void ScheduleCleanupEventQueue();
void HandleCleanupEventQueue();
void CleanupEventQueue();
- // hold all events before connection is established
+ // hold all events before connection is established
struct TPendingSessionEvent {
TInstant Deadline;
ui32 Size;
@@ -402,12 +402,12 @@ namespace NActors {
void ProcessPendingSessionEvents();
void DropSessionEvent(STATEFN_SIG);
- TInterconnectSessionTCP* Session = nullptr;
+ TInterconnectSessionTCP* Session = nullptr;
TActorId SessionID;
- // virtual ids used during handshake to check if it is the connection
- // for the same session or to find out the latest shandshake
- // it's virtual because session actor apears after successfull handshake
+ // virtual ids used during handshake to check if it is the connection
+ // for the same session or to find out the latest shandshake
+ // it's virtual because session actor apears after successfull handshake
TActorId SessionVirtualId;
TActorId RemoteSessionVirtualId;
@@ -416,27 +416,27 @@ namespace NActors {
const ui64 localId = TlsActivationContext->ExecutorThread.ActorSystem->AllocateIDSpace(1);
return NActors::TActorId(SelfId().NodeId(), 0, localId, 0);
- }
+ }
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
TActorId IncomingHandshakeActor;
- TInstant IncomingHandshakeActorFilledIn;
- TInstant IncomingHandshakeActorReset;
+ TInstant IncomingHandshakeActorFilledIn;
+ TInstant IncomingHandshakeActorReset;
TMaybe<ui64> LastSerialFromIncomingHandshake;
- THolder<IEventBase> HeldHandshakeReply;
+ THolder<IEventBase> HeldHandshakeReply;
void DropIncomingHandshake(bool poison = true) {
ICPROXY_PROFILED;
if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) {
LOG_DEBUG_IC("ICP111", "dropped incoming handshake: %s poison: %s", actorId.ToString().data(),
- poison ? "true" : "false");
- if (poison) {
+ poison ? "true" : "false");
+ if (poison) {
Send(actorId, new TEvents::TEvPoisonPill);
- }
+ }
LastSerialFromIncomingHandshake.Clear();
- HeldHandshakeReply.Reset();
+ HeldHandshakeReply.Reset();
IncomingHandshakeActorReset = TActivationContext::Now();
}
}
@@ -446,10 +446,10 @@ namespace NActors {
if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) {
LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(),
- poison ? "true" : "false");
- if (poison) {
+ poison ? "true" : "false");
+ if (poison) {
Send(actorId, new TEvents::TEvPoisonPill);
- }
+ }
OutgoingHandshakeActorReset = TActivationContext::Now();
}
}
@@ -464,38 +464,38 @@ namespace NActors {
void PrepareNewSessionHandshake() {
ICPROXY_PROFILED;
- // drop existing session if we have one
- if (Session) {
- LOG_INFO_IC("ICP04", "terminating current session as we are negotiating a new one");
+ // drop existing session if we have one
+ if (Session) {
+ LOG_INFO_IC("ICP04", "terminating current session as we are negotiating a new one");
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::NewSession());
- }
+ }
- // ensure we have no current session
- Y_VERIFY(!Session);
+ // ensure we have no current session
+ Y_VERIFY(!Session);
- // switch to pending connection state -- we wait for handshakes, we want more handshakes!
+ // switch to pending connection state -- we wait for handshakes, we want more handshakes!
SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection);
- }
+ }
void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
- THolder<IEventBase> event);
+ THolder<IEventBase> event);
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
TActorId OutgoingHandshakeActor;
- TInstant OutgoingHandshakeActorCreated;
- TInstant OutgoingHandshakeActorReset;
+ TInstant OutgoingHandshakeActorCreated;
+ TInstant OutgoingHandshakeActorReset;
- TInstant LastSessionDieTime;
+ TInstant LastSessionDieTime;
void GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev);
void Handle(TEvQueryStats::TPtr& ev);
- TDuration HoldByErrorWakeupDuration = TDuration::Zero();
- TEvents::TEvWakeup* HoldByErrorWakeupCookie;
+ TDuration HoldByErrorWakeupDuration = TDuration::Zero();
+ TEvents::TEvWakeup* HoldByErrorWakeupCookie;
- THolder<TProgramInfo> RemoteProgramInfo;
+ THolder<TProgramInfo> RemoteProgramInfo;
NInterconnect::TSecureSocketContext::TPtr SecureContext;
void Handle(TEvGetSecureSocket::TPtr ev) {
@@ -503,11 +503,11 @@ namespace NActors {
Send(ev->Sender, new TEvSecureSocket(std::move(socket)));
}
- TDeque<THolder<IEventHandle>> PendingIncomingHandshakeEvents;
+ TDeque<THolder<IEventHandle>> PendingIncomingHandshakeEvents;
TDeque<std::tuple<TInstant, TString, TString, ui32>> ErrorStateLog;
-
- void UpdateErrorStateLog(TInstant now, TString kind, TString explanation) {
+
+ void UpdateErrorStateLog(TInstant now, TString kind, TString explanation) {
ICPROXY_PROFILED;
if (ErrorStateLog) {
@@ -521,9 +521,9 @@ namespace NActors {
}
ErrorStateLog.emplace_back(now, std::move(kind), std::move(explanation), 1);
- if (ErrorStateLog.size() > 20) {
- ErrorStateLog.pop_front();
- }
+ if (ErrorStateLog.size() > 20) {
+ ErrorStateLog.pop_front();
+ }
}
void LogHandshakeFail(TEvHandshakeFail::TPtr& ev, bool inconclusive);
@@ -532,6 +532,6 @@ namespace NActors {
void HandleTerminate();
void PassAway() override;
- };
+ };
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index 31bac08b29..b95c994598 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -8,15 +8,15 @@
namespace NActors {
TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket)
- : TActor(&TThis::Initial)
+ : TActor(&TThis::Initial)
, TInterconnectLoggingBase(Sprintf("ICListener: %s", SelfId().ToString().data()))
- , Address(address.c_str(), port)
+ , Address(address.c_str(), port)
, Listener(
socket
? new NInterconnect::TStreamSocket(*socket)
: nullptr)
, ExternalSocket(!!Listener)
- , ProxyCommonCtx(std::move(common))
+ , ProxyCommonCtx(std::move(common))
{
if (ExternalSocket) {
SetNonBlock(*Listener);
@@ -24,13 +24,13 @@ namespace NActors {
}
TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
- return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
- }
+ return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
+ }
- void TInterconnectListenerTCP::Die(const TActorContext& ctx) {
+ void TInterconnectListenerTCP::Die(const TActorContext& ctx) {
LOG_DEBUG_IC("ICL08", "Dying");
- TActor::Die(ctx);
- }
+ TActor::Die(ctx);
+ }
int TInterconnectListenerTCP::Bind() {
NInterconnect::TAddress addr = Address;
@@ -57,7 +57,7 @@ namespace NActors {
Listener = NInterconnect::TStreamSocket::Make(addr.GetFamily());
if (*Listener == -1) {
return errno;
- }
+ }
SetNonBlock(*Listener);
Listener->SetSendBufferSize(ProxyCommonCtx->Settings.GetSendBufferSize()); // TODO(alexvru): WTF?
SetSockOpt(*Listener, SOL_SOCKET, SO_REUSEADDR, 1);
@@ -67,7 +67,7 @@ namespace NActors {
return e;
} else {
return 0;
- }
+ }
}
void TInterconnectListenerTCP::Bootstrap(const TActorContext& ctx) {
@@ -78,7 +78,7 @@ namespace NActors {
Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap);
return;
}
- }
+ }
if (const auto& callback = ProxyCommonCtx->InitWhiteboard) {
callback(Address.GetPort(), TlsActivationContext->ExecutorThread.ActorSystem);
}
@@ -112,6 +112,6 @@ namespace NActors {
}
break;
}
- }
+ }
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h
index 49e9f43dd6..fc71073c2d 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h
@@ -9,46 +9,46 @@
#include "events_local.h"
namespace NActors {
- class TInterconnectListenerTCP: public TActor<TInterconnectListenerTCP>, public TInterconnectLoggingBase {
- public:
- static constexpr EActivityType ActorActivityType() {
+ class TInterconnectListenerTCP: public TActor<TInterconnectListenerTCP>, public TInterconnectLoggingBase {
+ public:
+ static constexpr EActivityType ActorActivityType() {
return INTERCONNECT_COMMON;
- }
+ }
TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing());
int Bind();
- private:
- STFUNC(Initial) {
- switch (ev->GetTypeRewrite()) {
- CFunc(TEvents::TEvBootstrap::EventType, Bootstrap);
- CFunc(TEvents::TEvPoisonPill::EventType, Die);
- }
+ private:
+ STFUNC(Initial) {
+ switch (ev->GetTypeRewrite()) {
+ CFunc(TEvents::TEvBootstrap::EventType, Bootstrap);
+ CFunc(TEvents::TEvPoisonPill::EventType, Die);
+ }
}
- STFUNC(Listen) {
- switch (ev->GetTypeRewrite()) {
- CFunc(TEvents::TEvPoisonPill::EventType, Die);
+ STFUNC(Listen) {
+ switch (ev->GetTypeRewrite()) {
+ CFunc(TEvents::TEvPoisonPill::EventType, Die);
HFunc(TEvPollerRegisterResult, Handle);
CFunc(TEvPollerReady::EventType, Process);
- }
+ }
}
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override;
- void Die(const TActorContext& ctx) override;
+ void Die(const TActorContext& ctx) override;
- void Bootstrap(const TActorContext& ctx);
+ void Bootstrap(const TActorContext& ctx);
void Handle(TEvPollerRegisterResult::TPtr ev, const TActorContext& ctx);
void Process(const TActorContext& ctx);
- const NInterconnect::TAddress Address;
- TIntrusivePtr<NInterconnect::TStreamSocket> Listener;
+ const NInterconnect::TAddress Address;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Listener;
const bool ExternalSocket;
TPollerToken::TPtr PollerToken;
TInterconnectProxyCommon::TPtr const ProxyCommonCtx;
- };
+ };
static inline TActorId MakeInterconnectListenerActorId(bool dynamic) {
char x[12] = {'I', 'C', 'L', 'i', 's', 't', 'e', 'n', 'e', 'r', '/', dynamic ? 'D' : 'S'};
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 0e18a20072..2ded7f9f53 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -10,14 +10,14 @@
#include <library/cpp/monlib/service/pages/templates.h>
namespace NActors {
- LWTRACE_USING(ACTORLIB_PROVIDER);
+ LWTRACE_USING(ACTORLIB_PROVIDER);
- DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
+ DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
template<typename T>
T Coalesce(T&& x) {
return x;
- }
+ }
template<typename T, typename T2, typename... TRest>
typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) {
@@ -26,22 +26,22 @@ namespace NActors {
} else {
return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...);
}
- }
+ }
TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params)
- : TActor(&TInterconnectSessionTCP::StateFunc)
+ : TActor(&TInterconnectSessionTCP::StateFunc)
, Created(TInstant::Now())
- , Proxy(proxy)
+ , Proxy(proxy)
, CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this))
, LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this))
, Params(std::move(params))
- , TotalOutputQueueSize(0)
- , OutputStuckFlag(false)
- , OutputQueueUtilization(16)
- , OutputCounter(0ULL)
- {
+ , TotalOutputQueueSize(0)
+ , OutputStuckFlag(false)
+ , OutputQueueUtilization(16)
+ , OutputCounter(0ULL)
+ {
Proxy->Metrics->SetConnected(0);
- ReceiveContext.Reset(new TReceiveContext);
+ ReceiveContext.Reset(new TReceiveContext);
}
TInterconnectSessionTCP::~TInterconnectSessionTCP() {
@@ -62,7 +62,7 @@ namespace NActors {
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId));
SendUpdateToWhiteboard();
- }
+ }
void TInterconnectSessionTCP::CloseInputSession() {
Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession);
@@ -84,7 +84,7 @@ namespace NActors {
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
- }
+ }
Proxy->Metrics->SubSubscribersCount(Subscribers.size());
Subscribers.clear();
@@ -108,7 +108,7 @@ namespace NActors {
}
TActor::PassAway();
- }
+ }
void TInterconnectSessionTCP::PassAway() {
Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
@@ -118,13 +118,13 @@ namespace NActors {
Proxy->ValidateEvent(ev, "Forward");
LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
- ++MessagesGot;
+ ++MessagesGot;
- if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
+ if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Subscribe(ev);
- }
+ }
- ui16 evChannel = ev->GetChannel();
+ ui16 evChannel = ev->GetChannel();
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
@@ -136,31 +136,31 @@ namespace NActors {
if (!wasWorking) {
// this channel has returned to work -- it was empty and this we have just put first event in the queue
ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
- }
+ }
- SetOutputStuckFlag(true);
+ SetOutputStuckFlag(true);
++NumEventsInReadyChannels;
LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
- QueueSizeInEvents = oChannel.GetQueueSize(),
+ QueueSizeInEvents = oChannel.GetQueueSize(),
QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
- // check for overloaded queues
+ // check for overloaded queues
ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
- if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) {
+ if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) {
LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64,
- Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit);
+ Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit);
return Terminate(TDisconnectReason::QueueOverload());
- }
+ }
ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
CreateSessionKillingActor(Proxy->Common);
- }
- }
+ }
+ }
if (RamInQueue && !RamInQueue->Batching) {
// we have pending TEvRam, so GenerateTraffic will be called no matter what
@@ -198,12 +198,12 @@ namespace NActors {
void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
- }
+ }
THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
TEvHandshakeAsk *msg = ev->Get();
- // close existing input session, if any, and do nothing upon its destruction
+ // close existing input session, if any, and do nothing upon its destruction
ReestablishConnection({}, false, TDisconnectReason::NewSession());
const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial();
@@ -211,21 +211,21 @@ namespace NActors {
msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial);
return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params);
- }
+ }
void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) {
- if (ReceiverId) {
- // upon destruction of input session actor invoke this callback again
+ if (ReceiverId) {
+ // upon destruction of input session actor invoke this callback again
ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession());
- return;
- }
+ return;
+ }
LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64,
ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(),
i64(*ev->Get()->Socket));
NewConnectionSet = TActivationContext::Now();
- PacketsWrittenToSocket = 0;
+ PacketsWrittenToSocket = 0;
SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
Socket = std::move(ev->Get()->Socket);
@@ -233,21 +233,21 @@ namespace NActors {
// there may be a race
const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
- // arm watchdogs
+ // arm watchdogs
CloseOnIdleWatchdog.Arm(SelfId());
- // reset activity timestamps
+ // reset activity timestamps
LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
LOG_INFO_IC_SESSION("ICS10", "traffic start");
- // create input session actor
+ // create input session actor
auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
ReceiveContext->UnlockLastProcessedPacketSerial();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
- // register our socket in poller actor
+ // register our socket in poller actor
LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
Y_VERIFY(success);
@@ -257,31 +257,31 @@ namespace NActors {
Proxy->Metrics->SetConnected(1);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
- // arm pinger timer
+ // arm pinger timer
ResetFlushLogic();
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // REINITIALIZE SEND QUEUE
- //
- // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // REINITIALIZE SEND QUEUE
+ //
+ // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
// auxiliary packets; also reset packet metrics to zero to start sending from the beginning
- // also reset SendQueuePos
+ // also reset SendQueuePos
- // drop confirmed packets first as we do not need unwanted retransmissions
- SendQueuePos = SendQueue.end();
+ // drop confirmed packets first as we do not need unwanted retransmissions
+ SendQueuePos = SendQueue.end();
DropConfirmed(nextPacket);
for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) {
- const TSendQueue::iterator next = std::next(it);
- if (it->IsEmpty()) {
+ const TSendQueue::iterator next = std::next(it);
+ if (it->IsEmpty()) {
SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it);
- } else {
- it->ResetBufs();
- }
- it = next;
- }
+ } else {
+ it->ResetBufs();
+ }
+ it = next;
+ }
TrimSendQueueCache();
- SendQueuePos = SendQueue.begin();
+ SendQueuePos = SendQueue.begin();
TMaybe<ui64> s;
for (auto it = SendQueuePos; it != SendQueue.end(); ++it) {
@@ -295,13 +295,13 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
SendQueue.size(), LastConfirmed, serial);
- BytesUnwritten = 0;
- for (const auto& packet : SendQueue) {
+ BytesUnwritten = 0;
+ for (const auto& packet : SendQueue) {
BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) +
packet.GetDataSize();
- }
+ }
- SwitchStuckPeriod();
+ SwitchStuckPeriod();
LastHandshakeDone = TActivationContext::Now();
@@ -310,45 +310,45 @@ namespace NActors {
}
void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
- if (ev->Sender == ReceiverId) {
- TEvUpdateFromInputSession& msg = *ev->Get();
+ if (ev->Sender == ReceiverId) {
+ TEvUpdateFromInputSession& msg = *ev->Get();
// update ping time
Ping = msg.Ping;
LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat());
- bool needConfirm = false;
+ bool needConfirm = false;
- // update activity timer for dead peer checker
+ // update activity timer for dead peer checker
LastInputActivityTimestamp = TActivationContext::Now();
if (msg.NumDataBytes) {
- UnconfirmedBytes += msg.NumDataBytes;
+ UnconfirmedBytes += msg.NumDataBytes;
if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) {
- needConfirm = true;
- } else {
+ needConfirm = true;
+ } else {
SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod);
- }
+ }
- // reset payload watchdog that controls close-on-idle behaviour
+ // reset payload watchdog that controls close-on-idle behaviour
LastPayloadActivityTimestamp = TActivationContext::Now();
CloseOnIdleWatchdog.Reset();
- }
+ }
bool unblockedSomething = false;
LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
unblockedSomething = DropConfirmed(msg.ConfirmedByInput);
- }
+ }
// generate more traffic if we have unblocked state now
if (unblockedSomething) {
LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
GenerateTraffic();
- }
+ }
- // if we haven't generated any packets, then make a lone Flush packet without any data
- if (needConfirm && Socket) {
- ++ConfirmPacketsForcedBySize;
+ // if we haven't generated any packets, then make a lone Flush packet without any data
+ if (needConfirm && Socket) {
+ ++ConfirmPacketsForcedBySize;
MakePacket(false);
}
@@ -376,7 +376,7 @@ namespace NActors {
}
}
}
- }
+ }
void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
if (ev->Get() == RamInQueue) {
@@ -451,57 +451,57 @@ namespace NActors {
void TInterconnectSessionTCP::StartHandshake() {
LOG_INFO_IC_SESSION("ICS15", "start handshake");
IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
- }
+ }
void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
ReestablishConnection({}, true, std::move(reason));
- }
+ }
void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
TDisconnectReason reason) {
- if (Socket) {
+ if (Socket) {
LOG_INFO_IC_SESSION("ICS13", "reestablish connection");
ShutdownSocket(std::move(reason)); // stop sending/receiving on socket
PendingHandshakeDoneEvent = std::move(ev);
StartHandshakeOnSessionClose = startHandshakeOnSessionClose;
- if (!ReceiverId) {
+ if (!ReceiverId) {
ReestablishConnectionExecute();
- }
+ }
}
}
void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) {
- if (ev->Sender == ReceiverId) {
- const bool wasConnected(Socket);
+ if (ev->Sender == ReceiverId) {
+ const bool wasConnected(Socket);
LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
- if (wasConnected) {
- // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
- // restart handshake process, closing our part of socket first
+ if (wasConnected) {
+ // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
+ // restart handshake process, closing our part of socket first
ShutdownSocket(ev->Get()->Reason);
StartHandshake();
- } else {
+ } else {
ReestablishConnectionExecute();
- }
+ }
}
}
void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
- if (Socket) {
+ if (Socket) {
if (const TString& s = reason.ToString()) {
Proxy->Metrics->IncDisconnectByReason(s);
}
LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
- Socket->Shutdown(SHUT_RDWR);
- Socket.Reset();
+ Socket->Shutdown(SHUT_RDWR);
+ Socket.Reset();
Proxy->Metrics->IncDisconnections();
- CloseOnIdleWatchdog.Disarm();
+ CloseOnIdleWatchdog.Disarm();
LostConnectionWatchdog.Arm(SelfId());
Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
- }
+ }
}
void TInterconnectSessionTCP::ReestablishConnectionExecute() {
@@ -512,7 +512,7 @@ namespace NActors {
StartHandshake();
} else if (ev) {
SetNewConnection(ev);
- }
+ }
}
void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) {
@@ -527,7 +527,7 @@ namespace NActors {
GenerateTraffic();
} else if (!ev->Cookie) {
Proxy->Metrics->IncSpuriousWriteWakeups();
- }
+ }
if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
Send(ReceiverId, ev->Release().Release(), 0, 1);
}
@@ -548,10 +548,10 @@ namespace NActors {
void TInterconnectSessionTCP::WriteData() {
ui64 written = 0;
- Y_VERIFY(Socket); // ensure that socket wasn't closed
+ Y_VERIFY(Socket); // ensure that socket wasn't closed
LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
- constexpr ui32 iovLimit = 256;
+ constexpr ui32 iovLimit = 256;
#ifdef _linux_
ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
#else
@@ -561,8 +561,8 @@ namespace NActors {
maxElementsInIOV = 1;
}
- // vector of write buffers with preallocated stack space
- TStackVec<TConstIoVec, iovLimit> wbuffers;
+ // vector of write buffers with preallocated stack space
+ TStackVec<TConstIoVec, iovLimit> wbuffers;
LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
@@ -574,14 +574,14 @@ namespace NActors {
while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
- it->AppendToIoVector(wbuffers, maxElementsInIOV);
+ it->AppendToIoVector(wbuffers, maxElementsInIOV);
}
- const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
- int iovcnt = wbuffers.size();
+ const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
+ int iovcnt = wbuffers.size();
- Y_VERIFY(iovcnt > 0);
- Y_VERIFY(iovec->iov_len > 0);
+ Y_VERIFY(iovcnt > 0);
+ Y_VERIFY(iovec->iov_len > 0);
TString err;
ssize_t r = 0;
@@ -596,23 +596,23 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
- wbuffers.clear();
+ wbuffers.clear();
- if (r > 0) {
- Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
- BytesUnwritten -= r;
+ if (r > 0) {
+ Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
+ BytesUnwritten -= r;
written += r;
ui64 packets = 0;
- // advance SendQueuePos to eat all processed items
- for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
+ // advance SendQueuePos to eat all processed items
+ for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
if (!SendQueuePos->IsEmpty()) {
LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
}
- ++PacketsWrittenToSocket;
+ ++PacketsWrittenToSocket;
++packets;
LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
- }
+ }
LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
} else if (-r != EAGAIN && -r != EWOULDBLOCK) {
@@ -624,15 +624,15 @@ namespace NActors {
Proxy->Metrics->AddTotalBytesWritten(written);
}
return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
- } else {
+ } else {
// we have to do some hack for secure socket -- mark the packet as 'tried writing'
if (Params.Encryption) {
Y_VERIFY(SendQueuePos != SendQueue.end());
SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL
}
- // we have received EAGAIN error code, this means that we can't issue more data until we have received
- // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
+ // we have received EAGAIN error code, this means that we can't issue more data until we have received
+ // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
ReceiveContext->WriteBlockedByFullSendBuffer = true;
WriteBlockedCycles = GetCycleCountFast();
@@ -646,7 +646,7 @@ namespace NActors {
} else {
PollerToken->Request(false, true);
}
- }
+ }
}
}
}
@@ -656,12 +656,12 @@ namespace NActors {
}
void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
- if (period != TDuration::Max()) {
+ if (period != TDuration::Max()) {
const TInstant when = TActivationContext::Now() + period;
- if (when < ForcePacketTimestamp) {
- ForcePacketTimestamp = when;
+ if (when < ForcePacketTimestamp) {
+ ForcePacketTimestamp = when;
ScheduleFlush();
- }
+ }
}
}
@@ -671,7 +671,7 @@ namespace NActors {
FlushSchedule.push(ForcePacketTimestamp);
MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
++FlushEventsScheduled;
- }
+ }
}
void TInterconnectSessionTCP::HandleFlush() {
@@ -692,12 +692,12 @@ namespace NActors {
}
void TInterconnectSessionTCP::ResetFlushLogic() {
- ForcePacketTimestamp = TInstant::Max();
- UnconfirmedBytes = 0;
+ ForcePacketTimestamp = TInstant::Max();
+ UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
SetForcePacketTimestamp(ping);
- }
+ }
}
void TInterconnectSessionTCP::TrimSendQueueCache() {
@@ -719,19 +719,19 @@ namespace NActors {
}
ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
- Y_VERIFY(Socket);
-
- TSendQueue::iterator packet;
- if (SendQueueCache) {
- // we have entries in cache, take one and move it to the end of SendQueue
- packet = SendQueueCache.begin();
- SendQueue.splice(SendQueue.end(), SendQueueCache, packet);
- packet->Reuse(); // reset packet to initial state
- } else {
- // we have to allocate new packet, so just do it
+ Y_VERIFY(Socket);
+
+ TSendQueue::iterator packet;
+ if (SendQueueCache) {
+ // we have entries in cache, take one and move it to the end of SendQueue
+ packet = SendQueueCache.begin();
+ SendQueue.splice(SendQueue.end(), SendQueueCache, packet);
+ packet->Reuse(); // reset packet to initial state
+ } else {
+ // we have to allocate new packet, so just do it
LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) {
packet = SendQueue.emplace(SendQueue.end(), Params);
- }
+ }
}
// update send queue position
@@ -741,7 +741,7 @@ namespace NActors {
ui64 serial = 0;
- if (data) {
+ if (data) {
// generate serial for this data packet
serial = ++OutputCounter;
@@ -749,21 +749,21 @@ namespace NActors {
Y_VERIFY(NumEventsInReadyChannels);
LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
FillSendingBuffer(*packet, serial);
- }
+ }
Y_VERIFY(!packet->IsEmpty());
InflightDataAmount += packet->GetDataSize();
Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
if (InflightDataAmount > GetTotalInflightAmountOfData()) {
Proxy->Metrics->IncInflyLimitReach();
- }
+ }
- if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
+ if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast());
- AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
- }
-
- // update payload activity timer
+ AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
+ }
+
+ // update payload activity timer
LastPayloadActivityTimestamp = TActivationContext::Now();
} else if (pingMask) {
serial = *pingMask;
@@ -781,13 +781,13 @@ namespace NActors {
SendQueue.splice(std::next(SendQueuePos), SendQueue, packet);
}
}
- }
+ }
const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
packet->SetMetadata(serial, lastInputSerial);
- packet->Sign();
+ packet->Sign();
- // count number of bytes pending for write
+ // count number of bytes pending for write
ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
BytesUnwritten += packetSize;
@@ -795,15 +795,15 @@ namespace NActors {
" InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
InflightDataAmount, BytesUnwritten);
- // reset forced packet sending timestamp as we have confirmed all received data
+ // reset forced packet sending timestamp as we have confirmed all received data
ResetFlushLogic();
- ++PacketsGenerated;
+ ++PacketsGenerated;
LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
- if (!data) {
+ if (!data) {
WriteData();
- }
+ }
return packetSize;
}
@@ -814,21 +814,21 @@ namespace NActors {
Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
"%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64,
LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
- LastConfirmed = confirm;
+ LastConfirmed = confirm;
- ui64 droppedDataAmount = 0;
- ui32 numDropped = 0;
+ ui64 droppedDataAmount = 0;
+ ui32 numDropped = 0;
- // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
- // making Serial <= confirm true
- TSendQueue::iterator it;
+ // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
+ // making Serial <= confirm true
+ TSendQueue::iterator it;
ui64 lastDroppedSerial = 0;
for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) {
if (!it->IsEmpty()) {
lastDroppedSerial = it->GetSerial();
}
droppedDataAmount += it->GetDataSize();
- ++numDropped;
+ ++numDropped;
}
SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it);
TrimSendQueueCache();
@@ -840,7 +840,7 @@ namespace NActors {
const ui64 limit = GetTotalInflightAmountOfData();
const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
- PacketsConfirmed += numDropped;
+ PacketsConfirmed += numDropped;
InflightDataAmount -= droppedDataAmount;
Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
@@ -851,7 +851,7 @@ namespace NActors {
Pool->Trim(); // send any unsent free requests
return unblockedSomething;
- }
+ }
void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
ui32 bytesGenerated = 0;
@@ -914,42 +914,42 @@ namespace NActors {
const ui32 utilization = Socket ? CalculateQueueUtilization() : 0;
if (const auto& callback = Proxy->Common->UpdateWhiteboard) {
- enum class EFlag {
- GREEN,
- YELLOW,
- ORANGE,
- RED,
- };
- EFlag flagState = EFlag::RED;
-
- if (Socket) {
- flagState = EFlag::GREEN;
-
- do {
+ enum class EFlag {
+ GREEN,
+ YELLOW,
+ ORANGE,
+ RED,
+ };
+ EFlag flagState = EFlag::RED;
+
+ if (Socket) {
+ flagState = EFlag::GREEN;
+
+ do {
auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
- if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
- flagState = EFlag::ORANGE;
- break;
- } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) {
- flagState = EFlag::YELLOW;
- }
-
- // check utilization
+ if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
+ flagState = EFlag::ORANGE;
+ break;
+ } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) {
+ flagState = EFlag::YELLOW;
+ }
+
+ // check utilization
if (utilization > 875000) { // 7/8
- flagState = EFlag::ORANGE;
- break;
+ flagState = EFlag::ORANGE;
+ break;
} else if (utilization > 500000) { // 1/2
- flagState = EFlag::YELLOW;
- }
- } while (false);
- }
+ flagState = EFlag::YELLOW;
+ }
+ } while (false);
+ }
callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
- connected,
- flagState == EFlag::GREEN,
- flagState == EFlag::YELLOW,
- flagState == EFlag::ORANGE,
- flagState == EFlag::RED,
+ connected,
+ flagState == EFlag::GREEN,
+ flagState == EFlag::YELLOW,
+ flagState == EFlag::ORANGE,
+ flagState == EFlag::RED,
TlsActivationContext->ExecutorThread.ActorSystem);
}
@@ -958,34 +958,34 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) {
- if (OutputStuckFlag == state)
- return;
+ void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) {
+ if (OutputStuckFlag == state)
+ return;
- if (OutputQueueUtilization.Size() == 0)
- return;
+ if (OutputQueueUtilization.Size() == 0)
+ return;
- auto& lastpair = OutputQueueUtilization.Last();
- if (state)
+ auto& lastpair = OutputQueueUtilization.Last();
+ if (state)
lastpair.first -= GetCycleCountFast();
- else
+ else
lastpair.first += GetCycleCountFast();
- OutputStuckFlag = state;
- }
+ OutputStuckFlag = state;
+ }
- void TInterconnectSessionTCP::SwitchStuckPeriod() {
+ void TInterconnectSessionTCP::SwitchStuckPeriod() {
auto now = GetCycleCountFast();
- if (OutputQueueUtilization.Size() != 0) {
- auto& lastpair = OutputQueueUtilization.Last();
- lastpair.second = now - lastpair.second;
- if (OutputStuckFlag)
- lastpair.first += now;
- }
-
- OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now));
+ if (OutputQueueUtilization.Size() != 0) {
+ auto& lastpair = OutputQueueUtilization.Last();
+ lastpair.second = now - lastpair.second;
+ if (OutputStuckFlag)
+ lastpair.first += now;
+ }
+
+ OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now));
if (OutputStuckFlag)
- OutputQueueUtilization.Last().first -= now;
+ OutputQueueUtilization.Last().first -= now;
}
TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const {
@@ -1029,26 +1029,26 @@ namespace NActors {
}
void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) {
- HTML(str) {
- DIV_CLASS("panel panel-info") {
- DIV_CLASS("panel-heading") {
- str << "Session";
- }
- DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
- TABLER() {
- TABLEH() {
- str << "Sensor";
- }
- TABLEH() {
- str << "Value";
- }
- }
+ HTML(str) {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ str << "Session";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() {
+ str << "Sensor";
+ }
+ TABLEH() {
+ str << "Value";
+ }
+ }
}
- TABLEBODY() {
- TABLER() {
- TABLED() {
+ TABLEBODY() {
+ TABLER() {
+ TABLED() {
str << "Encryption";
}
TABLED() {
@@ -1111,89 +1111,89 @@ namespace NActors {
}
TABLER() {
TABLED() {
- str << "This page generated at";
- }
- TABLED() {
+ str << "This page generated at";
+ }
+ TABLED() {
str << TActivationContext::Now() << " / " << Now();
- }
- }
- TABLER() {
- TABLED() {
- str << "SelfID";
- }
- TABLED() {
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "SelfID";
+ }
+ TABLED() {
str << SelfId().ToString();
- }
- }
+ }
+ }
TABLER() {
TABLED() { str << "Frame version/Checksum"; }
TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); }
}
-#define MON_VAR(NAME) \
- TABLER() { \
- TABLED() { \
- str << #NAME; \
- } \
- TABLED() { \
- str << NAME; \
- } \
- }
-
- MON_VAR(Created)
- MON_VAR(NewConnectionSet)
- MON_VAR(ReceiverId)
- MON_VAR(MessagesGot)
- MON_VAR(MessagesWrittenToBuffer)
- MON_VAR(PacketsGenerated)
- MON_VAR(PacketsWrittenToSocket)
- MON_VAR(PacketsConfirmed)
- MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
- MON_VAR(ConfirmPacketsForcedBySize)
- MON_VAR(ConfirmPacketsForcedByTimeout)
-
- TABLER() {
- TABLED() {
- str << "Virtual self ID";
- }
- TABLED() {
- str << Proxy->SessionVirtualId.ToString();
- }
+#define MON_VAR(NAME) \
+ TABLER() { \
+ TABLED() { \
+ str << #NAME; \
+ } \
+ TABLED() { \
+ str << NAME; \
+ } \
+ }
+
+ MON_VAR(Created)
+ MON_VAR(NewConnectionSet)
+ MON_VAR(ReceiverId)
+ MON_VAR(MessagesGot)
+ MON_VAR(MessagesWrittenToBuffer)
+ MON_VAR(PacketsGenerated)
+ MON_VAR(PacketsWrittenToSocket)
+ MON_VAR(PacketsConfirmed)
+ MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
+ MON_VAR(ConfirmPacketsForcedBySize)
+ MON_VAR(ConfirmPacketsForcedByTimeout)
+
+ TABLER() {
+ TABLED() {
+ str << "Virtual self ID";
+ }
+ TABLED() {
+ str << Proxy->SessionVirtualId.ToString();
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Virtual peer ID";
+ }
+ TABLED() {
+ str << Proxy->RemoteSessionVirtualId.ToString();
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Socket";
+ }
+ TABLED() {
+ str << (Socket ? i64(*Socket) : -1);
+ }
}
- TABLER() {
- TABLED() {
- str << "Virtual peer ID";
- }
- TABLED() {
- str << Proxy->RemoteSessionVirtualId.ToString();
- }
- }
- TABLER() {
- TABLED() {
- str << "Socket";
- }
- TABLED() {
- str << (Socket ? i64(*Socket) : -1);
- }
- }
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
- MON_VAR(OutputStuckFlag)
- MON_VAR(SendQueue.size())
- MON_VAR(SendQueueCache.size())
+ MON_VAR(OutputStuckFlag)
+ MON_VAR(SendQueue.size())
+ MON_VAR(SendQueueCache.size())
MON_VAR(NumEventsInReadyChannels)
- MON_VAR(TotalOutputQueueSize)
- MON_VAR(BytesUnwritten)
+ MON_VAR(TotalOutputQueueSize)
+ MON_VAR(BytesUnwritten)
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
MON_VAR(SendBufferSize)
- MON_VAR(LastInputActivityTimestamp)
- MON_VAR(LastPayloadActivityTimestamp)
- MON_VAR(LastHandshakeDone)
- MON_VAR(OutputCounter)
+ MON_VAR(LastInputActivityTimestamp)
+ MON_VAR(LastPayloadActivityTimestamp)
+ MON_VAR(LastHandshakeDone)
+ MON_VAR(OutputCounter)
MON_VAR(LastSentSerial)
MON_VAR(ReceiveContext->GetLastProcessedPacketSerial())
- MON_VAR(LastConfirmed)
+ MON_VAR(LastConfirmed)
MON_VAR(FlushSchedule.size())
MON_VAR(MaxFlushSchedule)
MON_VAR(FlushEventsScheduled)
@@ -1211,11 +1211,11 @@ namespace NActors {
MON_VAR(GetPingRTT())
MON_VAR(clockSkew)
- MON_VAR(GetDeadPeerTimeout())
+ MON_VAR(GetDeadPeerTimeout())
MON_VAR(GetTotalInflightAmountOfData())
MON_VAR(GetCloseOnIdleTimeout())
MON_VAR(Subscribers.size())
- }
+ }
}
}
}
@@ -1224,5 +1224,5 @@ namespace NActors {
void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) {
TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common));
- }
+ }
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 1ecd829132..7fc00dbcc5 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -30,61 +30,61 @@
#include <unordered_map>
namespace NActors {
- class TSlowPathChecker {
- using TTraceCallback = std::function<void(double)>;
- TTraceCallback Callback;
- const NHPTimer::STime Start;
-
- public:
- TSlowPathChecker(TTraceCallback&& callback)
- : Callback(std::move(callback))
+ class TSlowPathChecker {
+ using TTraceCallback = std::function<void(double)>;
+ TTraceCallback Callback;
+ const NHPTimer::STime Start;
+
+ public:
+ TSlowPathChecker(TTraceCallback&& callback)
+ : Callback(std::move(callback))
, Start(GetCycleCountFast())
- {
- }
+ {
+ }
- ~TSlowPathChecker() {
+ ~TSlowPathChecker() {
const NHPTimer::STime end = GetCycleCountFast();
- const NHPTimer::STime elapsed = end - Start;
- if (elapsed > 1000000) {
- Callback(NHPTimer::GetSeconds(elapsed) * 1000);
- }
- }
-
- operator bool() const {
- return false;
+ const NHPTimer::STime elapsed = end - Start;
+ if (elapsed > 1000000) {
+ Callback(NHPTimer::GetSeconds(elapsed) * 1000);
+ }
}
- };
-#define LWPROBE_IF_TOO_LONG(...) \
- if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \
- ; \
- else
+ operator bool() const {
+ return false;
+ }
+ };
- class TTimeLimit {
- public:
- TTimeLimit(ui64 limitInCycles)
+#define LWPROBE_IF_TOO_LONG(...) \
+ if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \
+ ; \
+ else
+
+ class TTimeLimit {
+ public:
+ TTimeLimit(ui64 limitInCycles)
: UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles)
- {
- }
+ {
+ }
- TTimeLimit(ui64 startTS, ui64 limitInCycles)
- : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles)
- {
- }
+ TTimeLimit(ui64 startTS, ui64 limitInCycles)
+ : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles)
+ {
+ }
- bool CheckExceeded() {
+ bool CheckExceeded() {
return UpperLimit != 0 && GetCycleCountFast() > UpperLimit;
- }
+ }
- const ui64 UpperLimit;
- };
+ const ui64 UpperLimit;
+ };
- static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10);
+ static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10);
static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10);
- static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024;
- static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024;
+ static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024;
+ static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024;
- class TInterconnectProxyTCP;
+ class TInterconnectProxyTCP;
enum class EUpdateState : ui8 {
NONE, // no updates generated by input session yet
@@ -93,22 +93,22 @@ namespace NActors {
CONFIRMING, // confirmation inflight
};
- struct TReceiveContext: public TAtomicRefCount<TReceiveContext> {
- /* All invokations to these fields should be thread-safe */
+ struct TReceiveContext: public TAtomicRefCount<TReceiveContext> {
+ /* All invokations to these fields should be thread-safe */
- ui64 ControlPacketSendTimer = 0;
- ui64 ControlPacketId = 0;
+ ui64 ControlPacketSendTimer = 0;
+ ui64 ControlPacketId = 0;
- // number of packets received by input session
- TAtomic PacketsReadFromSocket = 0;
- TAtomic DataPacketsReadFromSocket = 0;
+ // number of packets received by input session
+ TAtomic PacketsReadFromSocket = 0;
+ TAtomic DataPacketsReadFromSocket = 0;
// last processed packet by input session
std::atomic_uint64_t LastProcessedPacketSerial = 0;
static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63;
- // for hardened checks
- TAtomic NumInputSessions = 0;
+ // for hardened checks
+ TAtomic NumInputSessions = 0;
NHPTimer::STime StartTime;
@@ -160,9 +160,9 @@ namespace NActors {
ui64 GetLastProcessedPacketSerial() {
return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
}
- };
+ };
- class TInputSessionTCP
+ class TInputSessionTCP
: public TActorBootstrapped<TInputSessionTCP>
, public TInterconnectLoggingBase
{
@@ -174,14 +174,14 @@ namespace NActors {
struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {};
struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {};
- public:
- static constexpr EActivityType ActorActivityType() {
+ public:
+ static constexpr EActivityType ActorActivityType() {
return INTERCONNECT_SESSION_TCP;
- }
+ }
TInputSessionTCP(const TActorId& sessionId,
- TIntrusivePtr<NInterconnect::TStreamSocket> socket,
- TIntrusivePtr<TReceiveContext> context,
+ TIntrusivePtr<NInterconnect::TStreamSocket> socket,
+ TIntrusivePtr<TReceiveContext> context,
TInterconnectProxyCommon::TPtr common,
std::shared_ptr<IInterconnectMetrics> metrics,
ui32 nodeId,
@@ -189,8 +189,8 @@ namespace NActors {
TDuration deadPeerTimeout,
TSessionParams params);
- private:
- friend class TActorBootstrapped<TInputSessionTCP>;
+ private:
+ friend class TActorBootstrapped<TInputSessionTCP>;
void Bootstrap();
@@ -204,13 +204,13 @@ namespace NActors {
cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate)
)
- private:
+ private:
TRope IncomingData;
const TActorId SessionId;
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
TPollerToken::TPtr PollerToken;
- TIntrusivePtr<TReceiveContext> Context;
+ TIntrusivePtr<TReceiveContext> Context;
TInterconnectProxyCommon::TPtr Common;
const ui32 NodeId;
const TSessionParams Params;
@@ -235,11 +235,11 @@ namespace NActors {
THolder<TEvUpdateFromInputSession> UpdateFromInputSession;
- ui64 ConfirmedByInput;
+ ui64 ConfirmedByInput;
std::shared_ptr<IInterconnectMetrics> Metrics;
- bool CloseInputSessionRequested = false;
+ bool CloseInputSessionRequested = false;
void CloseInputSession();
@@ -258,12 +258,12 @@ namespace NActors {
TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers;
- static constexpr size_t NumPreallocatedBuffers = 16;
- void PreallocateBuffers();
+ static constexpr size_t NumPreallocatedBuffers = 16;
+ void PreallocateBuffers();
- inline ui64 GetMaxCyclesPerEvent() const {
+ inline ui64 GetMaxCyclesPerEvent() const {
return DurationToCycles(TDuration::MicroSeconds(500));
- }
+ }
const TDuration DeadPeerTimeout;
TInstant LastReceiveTimestamp;
@@ -278,21 +278,21 @@ namespace NActors {
void HandlePingResponse(TDuration passed);
void HandleClock(TInstant clock);
- };
+ };
- class TInterconnectSessionTCP
+ class TInterconnectSessionTCP
: public TActor<TInterconnectSessionTCP>
, public TInterconnectLoggingBase
{
- enum {
+ enum {
EvCheckCloseOnIdle = EventSpaceBegin(TEvents::ES_PRIVATE),
EvCheckLostConnection,
EvRam,
EvTerminate,
EvFreeItems,
- };
+ };
- struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
+ struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
struct TEvCheckLostConnection : TEventLocal<TEvCheckLostConnection, EvCheckLostConnection> {};
struct TEvRam : TEventLocal<TEvRam, EvRam> {
@@ -308,18 +308,18 @@ namespace NActors {
{}
};
- const TInstant Created;
- TInstant NewConnectionSet;
- ui64 MessagesGot = 0;
- ui64 MessagesWrittenToBuffer = 0;
- ui64 PacketsGenerated = 0;
- ui64 PacketsWrittenToSocket = 0;
- ui64 PacketsConfirmed = 0;
+ const TInstant Created;
+ TInstant NewConnectionSet;
+ ui64 MessagesGot = 0;
+ ui64 MessagesWrittenToBuffer = 0;
+ ui64 PacketsGenerated = 0;
+ ui64 PacketsWrittenToSocket = 0;
+ ui64 PacketsConfirmed = 0;
- public:
- static constexpr EActivityType ActorActivityType() {
- return INTERCONNECT_SESSION_TCP;
- }
+ public:
+ static constexpr EActivityType ActorActivityType() {
+ return INTERCONNECT_SESSION_TCP;
+ }
TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params);
~TInterconnectSessionTCP();
@@ -339,8 +339,8 @@ namespace NActors {
return ReceiveContext->ClockSkew_us;
}
- private:
- friend class TInterconnectProxyTCP;
+ private:
+ friend class TInterconnectProxyTCP;
void Handle(TEvTerminate::TPtr& ev);
void HandlePoison();
@@ -400,7 +400,7 @@ namespace NActors {
void ReestablishConnectionWithHandshake(TDisconnectReason reason);
void ReestablishConnectionExecute();
- TInterconnectProxyTCP* const Proxy;
+ TInterconnectProxyTCP* const Proxy;
// various connection settings access
TDuration GetDeadPeerTimeout() const;
@@ -418,11 +418,11 @@ namespace NActors {
void IssuePingRequest();
void Handle(TEvProcessPingRequest::TPtr ev);
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TInstant LastInputActivityTimestamp;
- TInstant LastPayloadActivityTimestamp;
- TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
+ TInstant LastInputActivityTimestamp;
+ TInstant LastPayloadActivityTimestamp;
+ TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog;
void OnCloseOnIdleTimerHit() {
@@ -435,26 +435,26 @@ namespace NActors {
Terminate(TDisconnectReason::LostConnection());
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const TSessionParams Params;
TMaybe<TEventHolderPool> Pool;
TMaybe<TChannelScheduler> ChannelScheduler;
- ui64 TotalOutputQueueSize;
- bool OutputStuckFlag;
- TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization;
+ ui64 TotalOutputQueueSize;
+ bool OutputStuckFlag;
+ TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization;
size_t NumEventsInReadyChannels = 0;
- void SetOutputStuckFlag(bool state);
- void SwitchStuckPeriod();
+ void SetOutputStuckFlag(bool state);
+ void SwitchStuckPeriod();
- using TSendQueue = TList<TTcpPacketOutTask>;
- TSendQueue SendQueue;
- TSendQueue SendQueueCache;
- TSendQueue::iterator SendQueuePos;
+ using TSendQueue = TList<TTcpPacketOutTask>;
+ TSendQueue SendQueue;
+ TSendQueue SendQueueCache;
+ TSendQueue::iterator SendQueuePos;
ui64 WriteBlockedCycles = 0; // start of current block period
TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
- ui64 BytesUnwritten = 0;
+ ui64 BytesUnwritten = 0;
void TrimSendQueueCache();
@@ -467,21 +467,21 @@ namespace NActors {
}
}
- ui64 OutputCounter;
+ ui64 OutputCounter;
ui64 LastSentSerial = 0;
- TInstant LastHandshakeDone;
+ TInstant LastHandshakeDone;
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
TPollerToken::TPtr PollerToken;
ui32 SendBufferSize;
ui64 InflightDataAmount = 0;
std::unordered_map<TActorId, ui64, TActorId::THash> Subscribers;
- // time at which we want to send confirmation packet even if there was no outgoing data
- ui64 UnconfirmedBytes = 0;
- TInstant ForcePacketTimestamp = TInstant::Max();
+ // time at which we want to send confirmation packet even if there was no outgoing data
+ ui64 UnconfirmedBytes = 0;
+ TInstant ForcePacketTimestamp = TInstant::Max();
TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule;
size_t MaxFlushSchedule = 0;
ui64 FlushEventsScheduled = 0;
@@ -494,71 +494,71 @@ namespace NActors {
void GenerateHttpInfo(TStringStream& str);
- TIntrusivePtr<TReceiveContext> ReceiveContext;
+ TIntrusivePtr<TReceiveContext> ReceiveContext;
TActorId ReceiverId;
TDuration Ping;
- ui64 ConfirmPacketsForcedBySize = 0;
- ui64 ConfirmPacketsForcedByTimeout = 0;
+ ui64 ConfirmPacketsForcedBySize = 0;
+ ui64 ConfirmPacketsForcedByTimeout = 0;
- ui64 LastConfirmed = 0;
+ ui64 LastConfirmed = 0;
TEvHandshakeDone::TPtr PendingHandshakeDoneEvent;
bool StartHandshakeOnSessionClose = false;
ui64 EqualizeCounter = 0;
- };
+ };
- class TInterconnectSessionKiller
- : public TActorBootstrapped<TInterconnectSessionKiller> {
- ui32 RepliesReceived = 0;
- ui32 RepliesNumber = 0;
+ class TInterconnectSessionKiller
+ : public TActorBootstrapped<TInterconnectSessionKiller> {
+ ui32 RepliesReceived = 0;
+ ui32 RepliesNumber = 0;
TActorId LargestSession = TActorId();
- ui64 MaxBufferSize = 0;
+ ui64 MaxBufferSize = 0;
TInterconnectProxyCommon::TPtr Common;
- public:
+ public:
static constexpr EActivityType ActorActivityType() {
return INTERCONNECT_SESSION_KILLER;
}
TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common)
- : Common(common)
- {
- }
+ : Common(common)
+ {
+ }
void Bootstrap() {
- auto sender = SelfId();
+ auto sender = SelfId();
const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
- auto ev = new TEvSessionBufferSizeRequest();
- return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
- };
+ auto ev = new TEvSessionBufferSizeRequest();
+ return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
+ };
RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
- Become(&TInterconnectSessionKiller::StateFunc);
- }
+ Become(&TInterconnectSessionKiller::StateFunc);
+ }
STRICT_STFUNC(StateFunc,
hFunc(TEvSessionBufferSizeResponse, ProcessResponse)
cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered)
)
-
+
void ProcessResponse(TEvSessionBufferSizeResponse::TPtr& ev) {
- RepliesReceived++;
- if (MaxBufferSize < ev->Get()->BufferSize) {
- MaxBufferSize = ev->Get()->BufferSize;
- LargestSession = ev->Get()->SessionID;
- }
- if (RepliesReceived == RepliesNumber) {
+ RepliesReceived++;
+ if (MaxBufferSize < ev->Get()->BufferSize) {
+ MaxBufferSize = ev->Get()->BufferSize;
+ LargestSession = ev->Get()->SessionID;
+ }
+ if (RepliesReceived == RepliesNumber) {
Send(LargestSession, new TEvents::TEvPoisonPill);
- AtomicUnlock(&Common->StartedSessionKiller);
+ AtomicUnlock(&Common->StartedSessionKiller);
PassAway();
- }
+ }
}
-
+
void ProcessUndelivered() {
- RepliesReceived++;
+ RepliesReceived++;
}
- };
+ };
void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common);
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index f166ca0a99..2a8443da71 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -34,9 +34,9 @@ namespace NInterconnect {
CFunc(TEvents::TSystem::PoisonPill, Die);
)
- void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
ui64 bytes = ev->Get()->CalculateSerializedSizeCached();
- auto& record = ev->Get()->Record;
+ auto& record = ev->Get()->Record;
auto *hops = record.MutableHops();
while (!hops->empty() && !hops->begin()->HasNextHop()) {
record.ClearPayload();
@@ -81,7 +81,7 @@ namespace NInterconnect {
CFunc(TEvents::TSystem::PoisonPill, Die);
)
- void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]));
if (++SlaveIndex == Slaves.size()) {
SlaveIndex = 0;
@@ -107,7 +107,7 @@ namespace NInterconnect {
TLoadResponderMasterActor()
{}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap(const TActorContext& ctx) {
Become(&TLoadResponderMasterActor::StateFunc);
while (Slaves.size() < 10) {
Slaves.push_back(ctx.Register(new TLoadResponderActor(Traffic)));
@@ -118,7 +118,7 @@ namespace NInterconnect {
std::shared_ptr<std::atomic_uint64_t> Traffic = std::make_shared<std::atomic_uint64_t>();
};
- IActor* CreateLoadResponderActor() {
+ IActor* CreateLoadResponderActor() {
return new TLoadResponderMasterActor();
}
@@ -127,17 +127,17 @@ namespace NInterconnect {
return TActorId(nodeId, TStringBuf(x, 12));
}
- class TLoadActor: public TActorBootstrapped<TLoadActor> {
+ class TLoadActor: public TActorBootstrapped<TLoadActor> {
struct TEvGenerateMessages : TEventLocal<TEvGenerateMessages, EvGenerateMessages> {};
struct TEvPublishResults : TEventLocal<TEvPublishResults, EvPublishResults> {};
struct TMessageInfo {
TInstant SendTimestamp;
- TMessageInfo(const TInstant& sendTimestamp)
+ TMessageInfo(const TInstant& sendTimestamp)
: SendTimestamp(sendTimestamp)
- {
- }
+ {
+ }
};
const TLoadParams Params;
@@ -154,14 +154,14 @@ namespace NInterconnect {
return IActor::INTERCONNECT_LOAD_ACTOR;
}
- TLoadActor(const TLoadParams& params)
+ TLoadActor(const TLoadParams& params)
: Params(params)
{}
void Bootstrap(const TActorContext& ctx) {
Become(&TLoadActor::QueryTrafficCounter);
ctx.Send(MakeLoadResponderActorId(SelfId().NodeId()), new TEvQueryTrafficCounter);
- }
+ }
void Handle(TEvTrafficCounter::TPtr ev, const TActorContext& ctx) {
Traffic = std::move(ev->Get()->Traffic);
@@ -185,7 +185,7 @@ namespace NInterconnect {
SchedulePublishResults(ctx);
}
- void GenerateMessages(const TActorContext& ctx) {
+ void GenerateMessages(const TActorContext& ctx) {
while (InFly.size() < Params.InFlyMax && ctx.Now() >= NextMessageTimestamp) {
// generate payload
const ui32 size = Params.SizeMin + RandomNumber(Params.SizeMax - Params.SizeMin + 1);
@@ -232,8 +232,8 @@ namespace NInterconnect {
}
}
- void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record;
+ void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
auto it = InFly.find(record.GetId());
if (it != InFly.end()) {
// record message rtt
@@ -294,18 +294,18 @@ namespace NInterconnect {
TQueue<std::pair<TInstant, TString>> TimeoutQueue;
- void PutTimeoutQueueItem(const TActorContext& ctx, TString id) {
+ void PutTimeoutQueueItem(const TActorContext& ctx, TString id) {
TimeoutQueue.emplace(ctx.Now() + TDuration::Minutes(1), std::move(id));
if (TimeoutQueue.size() == 1) {
ScheduleWakeup(ctx);
}
}
- void ScheduleWakeup(const TActorContext& ctx) {
+ void ScheduleWakeup(const TActorContext& ctx) {
ctx.Schedule(TimeoutQueue.front().first - ctx.Now(), new TEvents::TEvWakeup);
}
- void HandleWakeup(const TActorContext& ctx) {
+ void HandleWakeup(const TActorContext& ctx) {
ui32 numDropped = 0;
while (TimeoutQueue && TimeoutQueue.front().first <= ctx.Now()) {
@@ -326,11 +326,11 @@ namespace NInterconnect {
const TDuration ResultPublishPeriod = TDuration::Seconds(15);
- void SchedulePublishResults(const TActorContext& ctx) {
+ void SchedulePublishResults(const TActorContext& ctx) {
ctx.Schedule(ResultPublishPeriod, new TEvPublishResults);
}
- void PublishResults(const TActorContext& ctx, bool schedule = true) {
+ void PublishResults(const TActorContext& ctx, bool schedule = true) {
const TInstant now = ctx.Now();
TStringStream msg;
@@ -354,7 +354,7 @@ namespace NInterconnect {
msg << "{window# " << duration << " samples# " << Histogram.size();
TVector<TDuration> v;
v.reserve(Histogram.size());
- for (const auto& item : Histogram) {
+ for (const auto& item : Histogram) {
v.push_back(item.second);
}
std::sort(v.begin(), v.end());
@@ -398,8 +398,8 @@ namespace NInterconnect {
}
};
- IActor* CreateLoadActor(const TLoadParams& params) {
+ IActor* CreateLoadActor(const TLoadParams& params) {
return new TLoadActor(params);
}
-}
+}
diff --git a/library/cpp/actors/interconnect/load.h b/library/cpp/actors/interconnect/load.h
index 78c80c8e17..0a01a0dc04 100644
--- a/library/cpp/actors/interconnect/load.h
+++ b/library/cpp/actors/interconnect/load.h
@@ -4,21 +4,21 @@
namespace NInterconnect {
// load responder -- lives on every node as a service actor
- NActors::IActor* CreateLoadResponderActor();
+ NActors::IActor* CreateLoadResponderActor();
NActors::TActorId MakeLoadResponderActorId(ui32 node);
// load actor -- generates load with specific parameters
struct TLoadParams {
TString Name;
ui32 Channel;
- TVector<ui32> NodeHops; // node ids for the message route
- ui32 SizeMin, SizeMax; // min and max size for payloads
- ui32 InFlyMax; // maximum number of in fly messages
+ TVector<ui32> NodeHops; // node ids for the message route
+ ui32 SizeMin, SizeMax; // min and max size for payloads
+ ui32 InFlyMax; // maximum number of in fly messages
TDuration IntervalMin, IntervalMax; // min and max intervals between sending messages
- bool SoftLoad; // is the load soft?
- TDuration Duration; // test duration
+ bool SoftLoad; // is the load soft?
+ TDuration Duration; // test duration
bool UseProtobufWithPayload; // store payload separately
};
- NActors::IActor* CreateLoadActor(const TLoadParams& params);
+ NActors::IActor* CreateLoadActor(const TLoadParams& params);
-}
+}
diff --git a/library/cpp/actors/interconnect/logging.h b/library/cpp/actors/interconnect/logging.h
index 4d628ba859..c429d1cade 100644
--- a/library/cpp/actors/interconnect/logging.h
+++ b/library/cpp/actors/interconnect/logging.h
@@ -18,7 +18,7 @@
#define LOG_LOG_IC(component, marker, priority, ...) \
do { \
LOG_LOG(::NActors::TActivationContext::AsActorContext(), (priority), (component), "%s " marker " %s", LogPrefix.data(), Sprintf(__VA_ARGS__).data()); \
- } while (false)
+ } while (false)
#define LOG_LOG_NET(priority, NODE_ID, FMT, ...) \
do { \
@@ -58,8 +58,8 @@ namespace NActors {
TInterconnectLoggingBase(const TString& prefix)
: LogPrefix(prefix)
- {
- }
+ {
+ }
void SetPrefix(TString logPrefix) const {
logPrefix.swap(const_cast<TString&>(LogPrefix));
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index e7cbb453f6..4ba50a2b5f 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -16,10 +16,10 @@
#define FORCE_EVENT_CHECKSUM 0
#endif
-using NActors::IEventBase;
-using NActors::IEventHandle;
+using NActors::IEventBase;
+using NActors::IEventHandle;
using NActors::TActorId;
-using NActors::TConstIoVec;
+using NActors::TConstIoVec;
using NActors::TEventSerializedData;
Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) {
@@ -142,7 +142,7 @@ struct TEventHolder : TNonCopyable {
namespace NActors {
class TEventOutputChannel;
-}
+}
struct TTcpPacketOutTask : TNonCopyable {
const TSessionParams& Params;
@@ -192,9 +192,9 @@ public:
Orbit.Reset();
}
- bool IsEmpty() const {
+ bool IsEmpty() const {
return !DataSize;
- }
+ }
void SetMetadata(ui64 serial, ui64 confirm) {
ApplyToHeader([&](auto& header) {
@@ -261,7 +261,7 @@ public:
bool DropBufs(size_t& amount) {
while (BufferIndex != Bufs.size()) {
- TConstIoVec& item = Bufs[BufferIndex];
+ TConstIoVec& item = Bufs[BufferIndex];
// calculate number of bytes to the end in current buffer
const size_t remain = item.Size - FirstBufferOffset;
if (amount >= remain) {
@@ -285,11 +285,11 @@ public:
TriedWriting = false;
}
- template <typename TVectorType>
- void AppendToIoVector(TVectorType& vector, size_t max) {
+ template <typename TVectorType>
+ void AppendToIoVector(TVectorType& vector, size_t max) {
for (size_t k = BufferIndex, offset = FirstBufferOffset; k != Bufs.size() && vector.size() < max; ++k, offset = 0) {
TConstIoVec v = Bufs[k];
- v.Data = static_cast<const char*>(v.Data) + offset;
+ v.Data = static_cast<const char*>(v.Data) + offset;
v.Size -= offset;
vector.push_back(v);
}
diff --git a/library/cpp/actors/interconnect/poller.h b/library/cpp/actors/interconnect/poller.h
index 476a7a5e92..ff7979369f 100644
--- a/library/cpp/actors/interconnect/poller.h
+++ b/library/cpp/actors/interconnect/poller.h
@@ -4,20 +4,20 @@
#include <library/cpp/actors/core/events.h>
namespace NActors {
- class TSharedDescriptor: public TThrRefBase {
- public:
- virtual int GetDescriptor() = 0;
- };
+ class TSharedDescriptor: public TThrRefBase {
+ public:
+ virtual int GetDescriptor() = 0;
+ };
- using TDelegate = std::function<void()>;
- using TFDDelegate = std::function<TDelegate(const TIntrusivePtr<TSharedDescriptor>&)>;
+ using TDelegate = std::function<void()>;
+ using TFDDelegate = std::function<TDelegate(const TIntrusivePtr<TSharedDescriptor>&)>;
- class IPoller: public TThrRefBase {
- public:
- virtual ~IPoller() = default;
+ class IPoller: public TThrRefBase {
+ public:
+ virtual ~IPoller() = default;
- virtual void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0;
- virtual void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0;
- };
+ virtual void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0;
+ virtual void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0;
+ };
}
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index 930d334821..e75cbcaef4 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -36,7 +36,7 @@ namespace NActors {
const TActorId WriteActorId;
std::atomic_uint32_t Flags = 0;
- TSocketRecord(TEvPollerRegister& ev)
+ TSocketRecord(TEvPollerRegister& ev)
: Socket(std::move(ev.Socket))
, ReadActorId(ev.ReadActorId)
, WriteActorId(ev.WriteActorId)
@@ -287,8 +287,8 @@ namespace NActors {
Impl->Request(read, write);
}
- IActor* CreatePollerActor() {
+ IActor* CreatePollerActor() {
return new TPollerActor;
}
-}
+}
diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h
index 899dbb73e1..f927b82089 100644
--- a/library/cpp/actors/interconnect/poller_actor.h
+++ b/library/cpp/actors/interconnect/poller_actor.h
@@ -53,11 +53,11 @@ namespace NActors {
{}
};
- IActor* CreatePollerActor();
+ IActor* CreatePollerActor();
inline TActorId MakePollerActorId() {
char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'};
return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
}
-}
+}
diff --git a/library/cpp/actors/interconnect/poller_tcp.cpp b/library/cpp/actors/interconnect/poller_tcp.cpp
index 456bca8b98..8267df31ea 100644
--- a/library/cpp/actors/interconnect/poller_tcp.cpp
+++ b/library/cpp/actors/interconnect/poller_tcp.cpp
@@ -1,35 +1,35 @@
#include "poller_tcp.h"
namespace NInterconnect {
- TPollerThreads::TPollerThreads(size_t units, bool useSelect)
- : Units(units)
- {
- Y_VERIFY_DEBUG(!Units.empty());
- for (auto& unit : Units)
- unit = TPollerUnit::Make(useSelect);
- }
+ TPollerThreads::TPollerThreads(size_t units, bool useSelect)
+ : Units(units)
+ {
+ Y_VERIFY_DEBUG(!Units.empty());
+ for (auto& unit : Units)
+ unit = TPollerUnit::Make(useSelect);
+ }
- TPollerThreads::~TPollerThreads() {
- }
+ TPollerThreads::~TPollerThreads() {
+ }
- void TPollerThreads::Start() {
- for (const auto& unit : Units)
- unit->Start();
- }
+ void TPollerThreads::Start() {
+ for (const auto& unit : Units)
+ unit->Start();
+ }
- void TPollerThreads::Stop() {
- for (const auto& unit : Units)
- unit->Stop();
- }
+ void TPollerThreads::Stop() {
+ for (const auto& unit : Units)
+ unit->Stop();
+ }
- void TPollerThreads::StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) {
- auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()];
- unit->StartReadOperation(s, std::move(operation));
- }
+ void TPollerThreads::StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) {
+ auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()];
+ unit->StartReadOperation(s, std::move(operation));
+ }
- void TPollerThreads::StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) {
- auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()];
- unit->StartWriteOperation(s, std::move(operation));
- }
+ void TPollerThreads::StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) {
+ auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()];
+ unit->StartWriteOperation(s, std::move(operation));
+ }
}
diff --git a/library/cpp/actors/interconnect/poller_tcp.h b/library/cpp/actors/interconnect/poller_tcp.h
index 2ae8631237..310265eccd 100644
--- a/library/cpp/actors/interconnect/poller_tcp.h
+++ b/library/cpp/actors/interconnect/poller_tcp.h
@@ -7,19 +7,19 @@
#include <util/generic/hash.h>
namespace NInterconnect {
- class TPollerThreads: public NActors::IPoller {
- public:
- TPollerThreads(size_t units = 1U, bool useSelect = false);
- ~TPollerThreads();
+ class TPollerThreads: public NActors::IPoller {
+ public:
+ TPollerThreads(size_t units = 1U, bool useSelect = false);
+ ~TPollerThreads();
- void Start();
- void Stop();
+ void Start();
+ void Stop();
- void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override;
- void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override;
+ void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override;
+ void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override;
- private:
- TVector<TPollerUnit::TPtr> Units;
- };
+ private:
+ TVector<TPollerUnit::TPtr> Units;
+ };
}
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.cpp b/library/cpp/actors/interconnect/poller_tcp_unit.cpp
index 36180353b6..59e7dda810 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit.cpp
+++ b/library/cpp/actors/interconnect/poller_tcp_unit.cpp
@@ -1,7 +1,7 @@
#include "poller_tcp_unit.h"
#if !defined(_win_) && !defined(_darwin_)
-#include "poller_tcp_unit_epoll.h"
+#include "poller_tcp_unit_epoll.h"
#endif
#include "poller_tcp_unit_select.h"
@@ -11,116 +11,116 @@
#include <library/cpp/actors/util/intrinsics.h>
#if defined _linux_
-#include <pthread.h>
+#include <pthread.h>
#endif
namespace NInterconnect {
- TPollerUnit::TPtr
- TPollerUnit::Make(bool useSelect) {
+ TPollerUnit::TPtr
+ TPollerUnit::Make(bool useSelect) {
#if defined(_win_) || defined(_darwin_)
- Y_UNUSED(useSelect);
- return TPtr(new TPollerUnitSelect);
+ Y_UNUSED(useSelect);
+ return TPtr(new TPollerUnitSelect);
#else
- return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll);
+ return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll);
#endif
- }
-
- TPollerUnit::TPollerUnit()
- : StopFlag(true)
- , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read"))
- , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write"))
- {
- }
-
- TPollerUnit::~TPollerUnit() {
- if (!AtomicLoad(&StopFlag))
- Stop();
- }
-
- void
- TPollerUnit::Start() {
- AtomicStore(&StopFlag, false);
- ReadLoop.Start();
- WriteLoop.Start();
- }
-
- void
- TPollerUnit::Stop() {
- AtomicStore(&StopFlag, true);
- ReadLoop.Join();
- WriteLoop.Join();
- }
-
- template <>
- TPollerUnit::TSide&
- TPollerUnit::GetSide<false>() {
- return Read;
- }
-
- template <>
- TPollerUnit::TSide&
- TPollerUnit::GetSide<true>() {
- return Write;
- }
-
- void
- TPollerUnit::StartReadOperation(
+ }
+
+ TPollerUnit::TPollerUnit()
+ : StopFlag(true)
+ , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read"))
+ , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write"))
+ {
+ }
+
+ TPollerUnit::~TPollerUnit() {
+ if (!AtomicLoad(&StopFlag))
+ Stop();
+ }
+
+ void
+ TPollerUnit::Start() {
+ AtomicStore(&StopFlag, false);
+ ReadLoop.Start();
+ WriteLoop.Start();
+ }
+
+ void
+ TPollerUnit::Stop() {
+ AtomicStore(&StopFlag, true);
+ ReadLoop.Join();
+ WriteLoop.Join();
+ }
+
+ template <>
+ TPollerUnit::TSide&
+ TPollerUnit::GetSide<false>() {
+ return Read;
+ }
+
+ template <>
+ TPollerUnit::TSide&
+ TPollerUnit::GetSide<true>() {
+ return Write;
+ }
+
+ void
+ TPollerUnit::StartReadOperation(
const TIntrusivePtr<TSharedDescriptor>& stream,
- TFDDelegate&& operation) {
- Y_VERIFY_DEBUG(stream);
- if (AtomicLoad(&StopFlag))
- return;
- GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
- }
-
- void
- TPollerUnit::StartWriteOperation(
+ TFDDelegate&& operation) {
+ Y_VERIFY_DEBUG(stream);
+ if (AtomicLoad(&StopFlag))
+ return;
+ GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
+ }
+
+ void
+ TPollerUnit::StartWriteOperation(
const TIntrusivePtr<TSharedDescriptor>& stream,
- TFDDelegate&& operation) {
- Y_VERIFY_DEBUG(stream);
- if (AtomicLoad(&StopFlag))
- return;
- GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
- }
-
- template <bool IsWrite>
- void*
- TPollerUnit::IdleThread(void* param) {
+ TFDDelegate&& operation) {
+ Y_VERIFY_DEBUG(stream);
+ if (AtomicLoad(&StopFlag))
+ return;
+ GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
+ }
+
+ template <bool IsWrite>
+ void*
+ TPollerUnit::IdleThread(void* param) {
// TODO: musl-libc version of `sched_param` struct is for some reason different from pthread
// version in Ubuntu 12.04
#if defined(_linux_) && !defined(_musl_)
- pthread_t threadSelf = pthread_self();
- sched_param sparam = {20};
- pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam);
+ pthread_t threadSelf = pthread_self();
+ sched_param sparam = {20};
+ pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam);
#endif
- static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>();
- return nullptr;
- }
+ static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>();
+ return nullptr;
+ }
- template <>
- void
- TPollerUnit::RunLoop<false>() {
+ template <>
+ void
+ TPollerUnit::RunLoop<false>() {
NProfiling::TMemoryTagScope tag("INTERCONNECT_RECEIVED_DATA");
- while (!AtomicLoad(&StopFlag))
- ProcessRead();
- }
+ while (!AtomicLoad(&StopFlag))
+ ProcessRead();
+ }
- template <>
- void
- TPollerUnit::RunLoop<true>() {
+ template <>
+ void
+ TPollerUnit::RunLoop<true>() {
NProfiling::TMemoryTagScope tag("INTERCONNECT_SEND_DATA");
- while (!AtomicLoad(&StopFlag))
- ProcessWrite();
- }
-
- void
- TPollerUnit::TSide::ProcessInput() {
- if (!InputQueue.IsEmpty())
- do {
- auto sock = InputQueue.Top().first->GetDescriptor();
- if (!Operations.emplace(sock, std::move(InputQueue.Top())).second)
- Y_FAIL("Descriptor is already in pooler.");
- } while (InputQueue.Pop());
- }
+ while (!AtomicLoad(&StopFlag))
+ ProcessWrite();
+ }
+
+ void
+ TPollerUnit::TSide::ProcessInput() {
+ if (!InputQueue.IsEmpty())
+ do {
+ auto sock = InputQueue.Top().first->GetDescriptor();
+ if (!Operations.emplace(sock, std::move(InputQueue.Top())).second)
+ Y_FAIL("Descriptor is already in pooler.");
+ } while (InputQueue.Pop());
+ }
}
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.h b/library/cpp/actors/interconnect/poller_tcp_unit.h
index d3f0d0fd0a..692168b968 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit.h
+++ b/library/cpp/actors/interconnect/poller_tcp_unit.h
@@ -10,58 +10,58 @@
#include <unordered_map>
namespace NInterconnect {
- using NActors::TFDDelegate;
- using NActors::TSharedDescriptor;
+ using NActors::TFDDelegate;
+ using NActors::TSharedDescriptor;
- class TPollerUnit {
- public:
- typedef std::unique_ptr<TPollerUnit> TPtr;
+ class TPollerUnit {
+ public:
+ typedef std::unique_ptr<TPollerUnit> TPtr;
- static TPtr Make(bool useSelect);
+ static TPtr Make(bool useSelect);
- void Start();
- void Stop();
+ void Start();
+ void Stop();
- virtual void StartReadOperation(
- const TIntrusivePtr<TSharedDescriptor>& stream,
- TFDDelegate&& operation);
+ virtual void StartReadOperation(
+ const TIntrusivePtr<TSharedDescriptor>& stream,
+ TFDDelegate&& operation);
- virtual void StartWriteOperation(
- const TIntrusivePtr<TSharedDescriptor>& stream,
- TFDDelegate&& operation);
+ virtual void StartWriteOperation(
+ const TIntrusivePtr<TSharedDescriptor>& stream,
+ TFDDelegate&& operation);
- virtual ~TPollerUnit();
+ virtual ~TPollerUnit();
- private:
- virtual void ProcessRead() = 0;
- virtual void ProcessWrite() = 0;
+ private:
+ virtual void ProcessRead() = 0;
+ virtual void ProcessWrite() = 0;
- template <bool IsWrite>
- static void* IdleThread(void* param);
+ template <bool IsWrite>
+ static void* IdleThread(void* param);
- template <bool IsWrite>
- void RunLoop();
+ template <bool IsWrite>
+ void RunLoop();
- volatile bool StopFlag;
- TThread ReadLoop, WriteLoop;
+ volatile bool StopFlag;
+ TThread ReadLoop, WriteLoop;
- protected:
- TPollerUnit();
+ protected:
+ TPollerUnit();
- struct TSide {
- using TOperations =
- std::unordered_map<SOCKET,
- std::pair<TIntrusivePtr<TSharedDescriptor>, TFDDelegate>>;
+ struct TSide {
+ using TOperations =
+ std::unordered_map<SOCKET,
+ std::pair<TIntrusivePtr<TSharedDescriptor>, TFDDelegate>>;
- TOperations Operations;
- using TItem = TOperations::mapped_type;
- TFunnelQueue<TItem> InputQueue;
+ TOperations Operations;
+ using TItem = TOperations::mapped_type;
+ TFunnelQueue<TItem> InputQueue;
- void ProcessInput();
- } Read, Write;
+ void ProcessInput();
+ } Read, Write;
- template <bool IsWrite>
- TSide& GetSide();
- };
+ template <bool IsWrite>
+ TSide& GetSide();
+ };
}
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp
index c0c4524f1e..c78538b95b 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp
+++ b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp
@@ -8,117 +8,117 @@
#include <cstring>
namespace NInterconnect {
- namespace {
- void
- DeleteEpoll(int epoll, SOCKET stream) {
- ::epoll_event event = {0, {.fd = stream}};
- if (::epoll_ctl(epoll, EPOLL_CTL_DEL, stream, &event)) {
- Cerr << "epoll_ctl errno: " << errno << Endl;
- Y_FAIL("epoll delete error!");
- }
- }
-
- template <ui32 Events>
- void
- AddEpoll(int epoll, SOCKET stream) {
+ namespace {
+ void
+ DeleteEpoll(int epoll, SOCKET stream) {
+ ::epoll_event event = {0, {.fd = stream}};
+ if (::epoll_ctl(epoll, EPOLL_CTL_DEL, stream, &event)) {
+ Cerr << "epoll_ctl errno: " << errno << Endl;
+ Y_FAIL("epoll delete error!");
+ }
+ }
+
+ template <ui32 Events>
+ void
+ AddEpoll(int epoll, SOCKET stream) {
::epoll_event event = {.events = Events};
- event.data.fd = stream;
- if (::epoll_ctl(epoll, EPOLL_CTL_ADD, stream, &event)) {
- Cerr << "epoll_ctl errno: " << errno << Endl;
- Y_FAIL("epoll add error!");
- }
- }
-
- int
- Initialize() {
- const auto epoll = ::epoll_create(10000);
- Y_VERIFY_DEBUG(epoll > 0);
- return epoll;
- }
-
+ event.data.fd = stream;
+ if (::epoll_ctl(epoll, EPOLL_CTL_ADD, stream, &event)) {
+ Cerr << "epoll_ctl errno: " << errno << Endl;
+ Y_FAIL("epoll add error!");
+ }
+ }
+
+ int
+ Initialize() {
+ const auto epoll = ::epoll_create(10000);
+ Y_VERIFY_DEBUG(epoll > 0);
+ return epoll;
+ }
+
}
- TPollerUnitEpoll::TPollerUnitEpoll()
- : ReadDescriptor(Initialize())
- , WriteDescriptor(Initialize())
- {
- // Block on the epoll descriptor.
- ::sigemptyset(&sigmask);
- ::sigaddset(&sigmask, SIGPIPE);
- ::sigaddset(&sigmask, SIGTERM);
+ TPollerUnitEpoll::TPollerUnitEpoll()
+ : ReadDescriptor(Initialize())
+ , WriteDescriptor(Initialize())
+ {
+ // Block on the epoll descriptor.
+ ::sigemptyset(&sigmask);
+ ::sigaddset(&sigmask, SIGPIPE);
+ ::sigaddset(&sigmask, SIGTERM);
}
- TPollerUnitEpoll::~TPollerUnitEpoll() {
- ::close(ReadDescriptor);
- ::close(WriteDescriptor);
- }
+ TPollerUnitEpoll::~TPollerUnitEpoll() {
+ ::close(ReadDescriptor);
+ ::close(WriteDescriptor);
+ }
- template <>
- int TPollerUnitEpoll::GetDescriptor<false>() const {
- return ReadDescriptor;
- }
+ template <>
+ int TPollerUnitEpoll::GetDescriptor<false>() const {
+ return ReadDescriptor;
+ }
- template <>
- int TPollerUnitEpoll::GetDescriptor<true>() const {
- return WriteDescriptor;
- }
+ template <>
+ int TPollerUnitEpoll::GetDescriptor<true>() const {
+ return WriteDescriptor;
+ }
- void
- TPollerUnitEpoll::StartReadOperation(
+ void
+ TPollerUnitEpoll::StartReadOperation(
const TIntrusivePtr<TSharedDescriptor>& s,
- TFDDelegate&& operation) {
- TPollerUnit::StartReadOperation(s, std::move(operation));
- AddEpoll<EPOLLRDHUP | EPOLLIN>(ReadDescriptor, s->GetDescriptor());
- }
+ TFDDelegate&& operation) {
+ TPollerUnit::StartReadOperation(s, std::move(operation));
+ AddEpoll<EPOLLRDHUP | EPOLLIN>(ReadDescriptor, s->GetDescriptor());
+ }
- void
- TPollerUnitEpoll::StartWriteOperation(
+ void
+ TPollerUnitEpoll::StartWriteOperation(
const TIntrusivePtr<TSharedDescriptor>& s,
- TFDDelegate&& operation) {
- TPollerUnit::StartWriteOperation(s, std::move(operation));
- AddEpoll<EPOLLRDHUP | EPOLLOUT>(WriteDescriptor, s->GetDescriptor());
- }
-
- constexpr int EVENTS_BUF_SIZE = 128;
-
- template <bool WriteOp>
- void
- TPollerUnitEpoll::Process() {
- ::epoll_event events[EVENTS_BUF_SIZE];
-
- const int epoll = GetDescriptor<WriteOp>();
-
- /* Timeout just to check StopFlag sometimes */
- const int result =
- ::epoll_pwait(epoll, events, EVENTS_BUF_SIZE, 200, &sigmask);
-
- if (result == -1 && errno != EINTR)
- Y_FAIL("epoll wait error!");
-
- auto& side = GetSide<WriteOp>();
- side.ProcessInput();
-
- for (int i = 0; i < result; ++i) {
- const auto it = side.Operations.find(events[i].data.fd);
- if (side.Operations.end() == it)
- continue;
- if (const auto& finalizer = it->second.second(it->second.first)) {
- DeleteEpoll(epoll, it->first);
- side.Operations.erase(it);
- finalizer();
- }
+ TFDDelegate&& operation) {
+ TPollerUnit::StartWriteOperation(s, std::move(operation));
+ AddEpoll<EPOLLRDHUP | EPOLLOUT>(WriteDescriptor, s->GetDescriptor());
+ }
+
+ constexpr int EVENTS_BUF_SIZE = 128;
+
+ template <bool WriteOp>
+ void
+ TPollerUnitEpoll::Process() {
+ ::epoll_event events[EVENTS_BUF_SIZE];
+
+ const int epoll = GetDescriptor<WriteOp>();
+
+ /* Timeout just to check StopFlag sometimes */
+ const int result =
+ ::epoll_pwait(epoll, events, EVENTS_BUF_SIZE, 200, &sigmask);
+
+ if (result == -1 && errno != EINTR)
+ Y_FAIL("epoll wait error!");
+
+ auto& side = GetSide<WriteOp>();
+ side.ProcessInput();
+
+ for (int i = 0; i < result; ++i) {
+ const auto it = side.Operations.find(events[i].data.fd);
+ if (side.Operations.end() == it)
+ continue;
+ if (const auto& finalizer = it->second.second(it->second.first)) {
+ DeleteEpoll(epoll, it->first);
+ side.Operations.erase(it);
+ finalizer();
+ }
}
}
- void
- TPollerUnitEpoll::ProcessRead() {
- Process<false>();
- }
+ void
+ TPollerUnitEpoll::ProcessRead() {
+ Process<false>();
+ }
- void
- TPollerUnitEpoll::ProcessWrite() {
- Process<true>();
- }
+ void
+ TPollerUnitEpoll::ProcessWrite() {
+ Process<true>();
+ }
}
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h
index 70aee1c66b..ff7893eba2 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h
+++ b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h
@@ -3,31 +3,31 @@
#include "poller_tcp_unit.h"
namespace NInterconnect {
- class TPollerUnitEpoll: public TPollerUnit {
- public:
- TPollerUnitEpoll();
- virtual ~TPollerUnitEpoll();
+ class TPollerUnitEpoll: public TPollerUnit {
+ public:
+ TPollerUnitEpoll();
+ virtual ~TPollerUnitEpoll();
- private:
- virtual void StartReadOperation(
- const TIntrusivePtr<TSharedDescriptor>& s,
- TFDDelegate&& operation) override;
+ private:
+ virtual void StartReadOperation(
+ const TIntrusivePtr<TSharedDescriptor>& s,
+ TFDDelegate&& operation) override;
- virtual void StartWriteOperation(
- const TIntrusivePtr<TSharedDescriptor>& s,
- TFDDelegate&& operation) override;
+ virtual void StartWriteOperation(
+ const TIntrusivePtr<TSharedDescriptor>& s,
+ TFDDelegate&& operation) override;
- virtual void ProcessRead() override;
- virtual void ProcessWrite() override;
+ virtual void ProcessRead() override;
+ virtual void ProcessWrite() override;
- template <bool Write>
- void Process();
+ template <bool Write>
+ void Process();
- template <bool Write>
- int GetDescriptor() const;
+ template <bool Write>
+ int GetDescriptor() const;
- const int ReadDescriptor, WriteDescriptor;
- ::sigset_t sigmask;
- };
+ const int ReadDescriptor, WriteDescriptor;
+ ::sigset_t sigmask;
+ };
}
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp b/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp
index f0acc6d338..ae7aaad566 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp
+++ b/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp
@@ -15,72 +15,72 @@ typedef timeval TIMEVAL;
#endif
namespace NInterconnect {
- TPollerUnitSelect::TPollerUnitSelect() {
- }
+ TPollerUnitSelect::TPollerUnitSelect() {
+ }
- TPollerUnitSelect::~TPollerUnitSelect() {
- }
+ TPollerUnitSelect::~TPollerUnitSelect() {
+ }
- template <bool IsWrite>
- void
- TPollerUnitSelect::Process() {
- auto& side = GetSide<IsWrite>();
- side.ProcessInput();
+ template <bool IsWrite>
+ void
+ TPollerUnitSelect::Process() {
+ auto& side = GetSide<IsWrite>();
+ side.ProcessInput();
- enum : size_t { R,
- W,
- E };
- static const auto O = IsWrite ? W : R;
+ enum : size_t { R,
+ W,
+ E };
+ static const auto O = IsWrite ? W : R;
- ::fd_set sets[3];
+ ::fd_set sets[3];
- FD_ZERO(&sets[R]);
- FD_ZERO(&sets[W]);
- FD_ZERO(&sets[E]);
+ FD_ZERO(&sets[R]);
+ FD_ZERO(&sets[W]);
+ FD_ZERO(&sets[E]);
- for (const auto& operation : side.Operations) {
- FD_SET(operation.first, &sets[O]);
- FD_SET(operation.first, &sets[E]);
- }
+ for (const auto& operation : side.Operations) {
+ FD_SET(operation.first, &sets[O]);
+ FD_SET(operation.first, &sets[E]);
+ }
#if defined(_win_)
- ::TIMEVAL timeout = {0L, 99991L};
- const auto numberEvents = !side.Operations.empty() ? ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout)
- : (::Sleep(100), 0);
+ ::TIMEVAL timeout = {0L, 99991L};
+ const auto numberEvents = !side.Operations.empty() ? ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout)
+ : (::Sleep(100), 0);
#elif defined(_darwin_)
- ::TIMEVAL timeout = {0L, 99991L};
- const auto numberEvents = ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout);
+ ::TIMEVAL timeout = {0L, 99991L};
+ const auto numberEvents = ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout);
#else
- ::sigset_t sigmask;
- ::sigemptyset(&sigmask);
- ::sigaddset(&sigmask, SIGPIPE);
- ::sigaddset(&sigmask, SIGTERM);
+ ::sigset_t sigmask;
+ ::sigemptyset(&sigmask);
+ ::sigaddset(&sigmask, SIGPIPE);
+ ::sigaddset(&sigmask, SIGTERM);
- struct ::timespec timeout = {0L, 99999989L};
- const auto numberEvents = ::pselect(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout, &sigmask);
+ struct ::timespec timeout = {0L, 99999989L};
+ const auto numberEvents = ::pselect(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout, &sigmask);
#endif
- Y_VERIFY_DEBUG(numberEvents >= 0);
-
- for (auto it = side.Operations.cbegin(); side.Operations.cend() != it;) {
- if (FD_ISSET(it->first, &sets[O]) || FD_ISSET(it->first, &sets[E]))
- if (const auto& finalizer = it->second.second(it->second.first)) {
- side.Operations.erase(it++);
- finalizer();
- continue;
- }
- ++it;
- }
+ Y_VERIFY_DEBUG(numberEvents >= 0);
+
+ for (auto it = side.Operations.cbegin(); side.Operations.cend() != it;) {
+ if (FD_ISSET(it->first, &sets[O]) || FD_ISSET(it->first, &sets[E]))
+ if (const auto& finalizer = it->second.second(it->second.first)) {
+ side.Operations.erase(it++);
+ finalizer();
+ continue;
+ }
+ ++it;
+ }
}
- void
- TPollerUnitSelect::ProcessRead() {
- Process<false>();
- }
+ void
+ TPollerUnitSelect::ProcessRead() {
+ Process<false>();
+ }
- void
- TPollerUnitSelect::ProcessWrite() {
- Process<true>();
- }
+ void
+ TPollerUnitSelect::ProcessWrite() {
+ Process<true>();
+ }
}
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_select.h b/library/cpp/actors/interconnect/poller_tcp_unit_select.h
index 7b6d5b3996..0c15217796 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit_select.h
+++ b/library/cpp/actors/interconnect/poller_tcp_unit_select.h
@@ -3,17 +3,17 @@
#include "poller_tcp_unit.h"
namespace NInterconnect {
- class TPollerUnitSelect: public TPollerUnit {
- public:
- TPollerUnitSelect();
- virtual ~TPollerUnitSelect();
+ class TPollerUnitSelect: public TPollerUnit {
+ public:
+ TPollerUnitSelect();
+ virtual ~TPollerUnitSelect();
- private:
- virtual void ProcessRead() override;
- virtual void ProcessWrite() override;
+ private:
+ virtual void ProcessRead() override;
+ virtual void ProcessWrite() override;
- template <bool IsWrite>
- void Process();
- };
+ template <bool IsWrite>
+ void Process();
+ };
}
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
index d17755cad1..2b6d27cd3f 100644
--- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
+++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -9,14 +9,14 @@
#include <util/generic/noncopyable.h>
-class TTestICCluster: public TNonCopyable {
+class TTestICCluster: public TNonCopyable {
public:
struct TTrafficInterrupterSettings {
TDuration RejectingTrafficTimeout;
double BandWidth;
bool Disconnect;
};
-
+
private:
const ui32 NumNodes;
const TString Address = "::1";
@@ -29,7 +29,7 @@ private:
public:
TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(),
- TTrafficInterrupterSettings* tiSettings = nullptr)
+ TTrafficInterrupterSettings* tiSettings = nullptr)
: NumNodes(numNodes)
, Counters(new NMonitoring::TDynamicCounters)
, ChannelsConfig(channelsConfig)
@@ -67,7 +67,7 @@ public:
return Nodes[id].Get();
}
- ~TTestICCluster() {
+ ~TTestICCluster() {
}
TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h
index a4a4373f3e..48851de2c5 100644
--- a/library/cpp/actors/interconnect/ut/lib/interrupter.h
+++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h
@@ -26,7 +26,7 @@ class TTrafficInterrupter
TVector<char> Data;
};
struct TCompare {
- bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const {
+ bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const {
return x.first > y.first;
};
};
@@ -160,7 +160,7 @@ private:
timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now());
}
};
- for (auto& it : Connections) {
+ for (auto& it : Connections) {
updateTimout(it.ForwardConnection);
updateTimout(it.BackwardConnection);
}
@@ -177,20 +177,20 @@ private:
}
if (DelayTraffic) { // process packets from DelayQueues
auto processDelayedPackages = [](TDirectedConnection& conn) {
- while (!conn.DelayedQueue.empty()) {
- auto& frontPackage = conn.DelayedQueue.top();
- if (TInstant::Now() >= frontPackage.first) {
- TInet6StreamSocket* sock = frontPackage.second.ForwardSocket;
- if (sock) {
+ while (!conn.DelayedQueue.empty()) {
+ auto& frontPackage = conn.DelayedQueue.top();
+ if (TInstant::Now() >= frontPackage.first) {
+ TInet6StreamSocket* sock = frontPackage.second.ForwardSocket;
+ if (sock) {
sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size());
}
- conn.DelayedQueue.pop();
- } else {
- break;
+ conn.DelayedQueue.pop();
+ } else {
+ break;
}
- }
- };
- for (auto& it : Connections) {
+ }
+ };
+ for (auto& it : Connections) {
processDelayedPackages(it.ForwardConnection);
processDelayedPackages(it.BackwardConnection);
}
@@ -228,7 +228,7 @@ private:
if (recvSize > 0) {
if (DelayTraffic) {
// put packet into DelayQueue
- const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth);
+ const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth);
const TInstant now = TInstant::Now();
directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay;
TDelayedPacket pkt;
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index cd3a39a6c0..ff30b1445e 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -63,7 +63,7 @@ public:
TMailboxType::ReadAsFilled, 0));
const TActorId loggerActorId(0, "logger");
- constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
+ constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
auto loggerSettings = MakeIntrusive<NLog::TSettings>(
loggerActorId,
@@ -78,7 +78,7 @@ public:
NActorsServices::EServiceCommon_Name
);
- constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
+ constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
static const TString WilsonComponentName = "WILSON";
loggerSettings->Append(
diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h
index bcb3504ebe..7591200471 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_actors.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h
@@ -1,56 +1,56 @@
#pragma once
namespace NActors {
- class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {
- protected:
+ class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {
+ protected:
const TActorId RecipientActorId;
- const ui32 Preload;
- ui64 SequenceNumber = 0;
- ui32 InFlySize = 0;
+ const ui32 Preload;
+ ui64 SequenceNumber = 0;
+ ui32 InFlySize = 0;
- public:
+ public:
TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
- : RecipientActorId(recipientActorId)
- , Preload(preload)
- {
- }
+ : RecipientActorId(recipientActorId)
+ , Preload(preload)
+ {
+ }
- virtual ~TSenderBaseActor() {
- }
+ virtual ~TSenderBaseActor() {
+ }
- virtual void Bootstrap(const TActorContext& ctx) {
- Become(&TSenderBaseActor::StateFunc);
+ virtual void Bootstrap(const TActorContext& ctx) {
+ Become(&TSenderBaseActor::StateFunc);
ctx.Send(ctx.ExecutorThread.ActorSystem->InterconnectProxy(RecipientActorId.NodeId()), new TEvInterconnect::TEvConnectNode);
- }
+ }
- virtual void SendMessagesIfPossible(const TActorContext& ctx) {
- while (InFlySize < Preload) {
- SendMessage(ctx);
- }
+ virtual void SendMessagesIfPossible(const TActorContext& ctx) {
+ while (InFlySize < Preload) {
+ SendMessage(ctx);
+ }
}
- virtual void SendMessage(const TActorContext& /*ctx*/) {
- ++SequenceNumber;
- }
+ virtual void SendMessage(const TActorContext& /*ctx*/) {
+ ++SequenceNumber;
+ }
- virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) {
- SendMessage(ctx);
- }
+ virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) {
+ SendMessage(ctx);
+ }
- virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) {
- SendMessagesIfPossible(ctx);
- }
+ virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) {
+ SendMessagesIfPossible(ctx);
+ }
void Handle(TEvInterconnect::TEvNodeConnected::TPtr& /*ev*/, const TActorContext& ctx) {
SendMessagesIfPossible(ctx);
- }
+ }
- void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
- }
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
+ }
- virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
- Die(ctx);
- }
+ virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
+ Die(ctx);
+ }
virtual STRICT_STFUNC(StateFunc,
HFunc(TEvTestResponse, Handle)
@@ -59,25 +59,25 @@ namespace NActors {
HFunc(TEvInterconnect::TEvNodeConnected, Handle)
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle)
)
- };
+ };
- class TReceiverBaseActor: public TActor<TReceiverBaseActor> {
- protected:
- ui64 ReceivedCount = 0;
+ class TReceiverBaseActor: public TActor<TReceiverBaseActor> {
+ protected:
+ ui64 ReceivedCount = 0;
- public:
- TReceiverBaseActor()
- : TActor(&TReceiverBaseActor::StateFunc)
- {
- }
+ public:
+ TReceiverBaseActor()
+ : TActor(&TReceiverBaseActor::StateFunc)
+ {
+ }
- virtual ~TReceiverBaseActor() {
- }
+ virtual ~TReceiverBaseActor() {
+ }
virtual STRICT_STFUNC(StateFunc,
HFunc(TEvTest, Handle)
)
virtual void Handle(TEvTest::TPtr& /*ev*/, const TActorContext& /*ctx*/) {}
- };
+ };
}
diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h
index 29d52b20af..cd0d9e0152 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_events.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_events.h
@@ -2,7 +2,7 @@
#include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h>
-namespace NActors {
+namespace NActors {
enum {
EvTest = EventSpaceBegin(TEvents::ES_PRIVATE),
EvTestChan,
diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp
index c83f7c2ddf..5d19bc3003 100644
--- a/library/cpp/actors/interconnect/ut_fat/main.cpp
+++ b/library/cpp/actors/interconnect/ut_fat/main.cpp
@@ -61,13 +61,13 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record;
Y_VERIFY(record.HasConfirmedSequenceNumber());
if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) {
- while (record.GetConfirmedSequenceNumber() != InFly.front()) {
+ while (record.GetConfirmedSequenceNumber() != InFly.front()) {
InFly.pop_front();
--InFlySize;
}
}
Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64,
- record.GetConfirmedSequenceNumber(), InFly.front());
+ record.GetConfirmedSequenceNumber(), InFly.front());
InFly.pop_front();
--InFlySize;
SendMessagesIfPossible(ctx);
diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h
index 52376aa3fb..c190105a59 100644
--- a/library/cpp/actors/interconnect/watchdog_timer.h
+++ b/library/cpp/actors/interconnect/watchdog_timer.h
@@ -1,7 +1,7 @@
#pragma once
namespace NActors {
- template <typename TEvent>
+ template <typename TEvent>
class TWatchdogTimer {
using TCallback = std::function<void()>;
@@ -9,7 +9,7 @@ namespace NActors {
const TCallback Callback;
TInstant LastResetTimestamp;
- TEvent* ExpectedEvent = nullptr;
+ TEvent* ExpectedEvent = nullptr;
ui32 Iteration = 0;
static constexpr ui32 NumIterationsBeforeFiring = 2;
@@ -18,8 +18,8 @@ namespace NActors {
TWatchdogTimer(TDuration timeout, TCallback callback)
: Timeout(timeout)
, Callback(std::move(callback))
- {
- }
+ {
+ }
void Arm(const TActorIdentity& actor) {
if (Timeout != TDuration::Zero() && Timeout != TDuration::Max()) {
@@ -65,4 +65,4 @@ namespace NActors {
}
};
-}
+}