aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:02 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:02 +0300
commit3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch)
treec2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/actors/interconnect
parentab3783171cc30e262243a0227c86118f7080c896 (diff)
downloadydb-3e0b762a82514bac89c1dd6ea7211e381d8aa248.tar.gz
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/events_local.h28
-rw-r--r--library/cpp/actors/interconnect/interconnect.h10
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h8
-rw-r--r--library/cpp/actors/interconnect/interconnect_counters.cpp10
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp34
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_impl.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_mon.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_mon.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_nameserver_table.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp14
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h26
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.h6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h10
-rw-r--r--library/cpp/actors/interconnect/load.cpp14
-rw-r--r--library/cpp/actors/interconnect/load.h2
-rw-r--r--library/cpp/actors/interconnect/mock/ic_mock.cpp4
-rw-r--r--library/cpp/actors/interconnect/packet.h8
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp72
-rw-r--r--library/cpp/actors/interconnect/poller_actor.h4
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/large.cpp8
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h4
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h6
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_actors.h4
-rw-r--r--library/cpp/actors/interconnect/ut/poller_actor_ut.cpp272
-rw-r--r--library/cpp/actors/interconnect/ut/ya.make6
-rw-r--r--library/cpp/actors/interconnect/ut_fat/main.cpp6
-rw-r--r--library/cpp/actors/interconnect/ya.make8
34 files changed, 302 insertions, 302 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 8a46ffd535..a7da62c3d7 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -107,29 +107,29 @@ namespace NActors {
struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
- TEvHandshakeAsk(const TActorId& self,
- const TActorId& peer,
+ TEvHandshakeAsk(const TActorId& self,
+ const TActorId& peer,
ui64 counter)
: Self(self)
, Peer(peer)
, Counter(counter)
{
}
- const TActorId Self;
- const TActorId Peer;
+ const TActorId Self;
+ const TActorId Peer;
const ui64 Counter;
};
struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck")
- TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
+ TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
: Self(self)
, NextPacket(nextPacket)
, Params(std::move(params))
{}
- const TActorId Self;
+ const TActorId Self;
const ui64 NextPacket;
const TSessionParams Params;
};
@@ -185,8 +185,8 @@ namespace NActors {
TEvHandshakeDone(
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
- const TActorId& peer,
- const TActorId& self,
+ const TActorId& peer,
+ const TActorId& self,
ui64 nextPacket,
TAutoPtr<TProgramInfo>&& programInfo,
TSessionParams params)
@@ -200,8 +200,8 @@ namespace NActors {
}
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
- const TActorId Peer;
- const TActorId Self;
+ const TActorId Peer;
+ const TActorId Self;
const ui64 NextPacket;
TAutoPtr<TProgramInfo> ProgramInfo;
const TSessionParams Params;
@@ -319,10 +319,10 @@ namespace NActors {
template <typename TContainer>
TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) {
- for (const TActorId& actorId : route) {
+ for (const TActorId& actorId : route) {
auto* hop = Record.AddHops();
if (actorId) {
- ActorIdToProto(actorId, hop->MutableNextHop());
+ ActorIdToProto(actorId, hop->MutableNextHop());
}
}
Record.SetId(id);
@@ -366,13 +366,13 @@ namespace NActors {
};
struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {
- TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
+ TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
: SessionID(sessionId)
, BufferSize(outputBufferSize)
{
}
- TActorId SessionID;
+ TActorId SessionID;
ui64 BufferSize;
};
diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h
index 225a5243fd..f052a6e92e 100644
--- a/library/cpp/actors/interconnect/interconnect.h
+++ b/library/cpp/actors/interconnect/interconnect.h
@@ -10,7 +10,7 @@ namespace NActors {
TString SelfAddress;
ui32 SelfPort;
- TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time)
+ TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time)
};
struct TInterconnectProxySetup: public TThrRefBase {
@@ -41,12 +41,12 @@ namespace NActors {
TIntrusivePtr<TInterconnectGlobalState> GlobalState;
- virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls
+ virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls
virtual TActorSetupCmd CreateAcceptor() = 0;
};
struct TNameserverSetup {
- TActorId ServiceID;
+ TActorId ServiceID;
TIntrusivePtr<TInterconnectGlobalState> GlobalState;
};
@@ -118,12 +118,12 @@ namespace NActors {
};
struct TNodeRegistrarSetup {
- TActorId ServiceID;
+ TActorId ServiceID;
TIntrusivePtr<TInterconnectGlobalState> GlobalState;
};
- TActorId GetNameserviceActorId();
+ TActorId GetNameserviceActorId();
/**
* Const table-lookup based name service
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index e4a0ae3cda..659a6a9e5c 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -1,6 +1,6 @@
#pragma once
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/event_load.h>
#include <library/cpp/actors/util/rope.h>
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index 285709a00c..81e0694da1 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -3,7 +3,7 @@
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/util/datetime.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/generic/map.h>
#include <util/generic/set.h>
@@ -63,7 +63,7 @@ namespace NActors {
typedef TMap<ui16, TChannelSettings> TChannelsConfig;
using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title,
- TActorSystem* actorSystem, const TActorId& actorId)>;
+ TActorSystem* actorSystem, const TActorId& actorId)>;
using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>;
@@ -71,13 +71,13 @@ namespace NActors {
bool orange, bool red, TActorSystem* actorSystem)>;
struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> {
- TActorId NameserviceId;
+ TActorId NameserviceId;
NMonitoring::TDynamicCounterPtr MonCounters;
std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
TChannelsConfig ChannelsConfig;
TInterconnectSettings Settings;
TRegisterMonPageCallback RegisterMonPage;
- TActorId DestructorId;
+ TActorId DestructorId;
std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize;
TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024;
TString ClusterUUID;
diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp
index 224160d4b4..e389e93688 100644
--- a/library/cpp/actors/interconnect/interconnect_counters.cpp
+++ b/library/cpp/actors/interconnect/interconnect_counters.cpp
@@ -619,11 +619,11 @@ namespace {
TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read");
for (const char *reason : TDisconnectReason::Reasons) {
- DisconnectByReason_[reason] = Metrics_->Rate(
- NMonitoring::MakeLabels({
- {"sensor", "interconnect.disconnect_reason"},
- {"reason", reason},
- }));
+ DisconnectByReason_[reason] = Metrics_->Rate(
+ NMonitoring::MakeLabels({
+ {"sensor", "interconnect.disconnect_reason"},
+ {"reason", reason},
+ }));
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 9ede998d8e..51d1e607bc 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -25,8 +25,8 @@ namespace NActors {
struct TInitialPacket {
struct {
- TActorId SelfVirtualId;
- TActorId PeerVirtualId;
+ TActorId SelfVirtualId;
+ TActorId PeerVirtualId;
ui64 NextPacket;
ui64 Version;
} Header;
@@ -34,7 +34,7 @@ namespace NActors {
TInitialPacket() = default;
- TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) {
+ TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) {
Header.SelfVirtualId = self;
Header.PeerVirtualId = peer;
Header.NextPacket = nextPacket;
@@ -79,8 +79,8 @@ namespace NActors {
private:
TInterconnectProxyCommon::TPtr Common;
- TActorId SelfVirtualId;
- TActorId PeerVirtualId;
+ TActorId SelfVirtualId;
+ TActorId PeerVirtualId;
ui32 PeerNodeId = 0;
ui64 NextPacketToPeer = 0;
TMaybe<ui64> NextPacketFromPeer; // will be obtained from incoming initial packet
@@ -102,7 +102,7 @@ namespace NActors {
return IActor::INTERCONNECT_HANDSHAKE;
}
- THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
+ THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
: TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
, Common(std::move(common))
@@ -377,7 +377,7 @@ namespace NActors {
// set up virtual self id to ensure peer will not drop our connection
char buf[12] = {'c', 'o', 'o', 'k', 'i', 'e', ' ', 'c', 'h', 'e', 'c', 'k'};
- SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12));
+ SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12));
bool success = true;
try {
@@ -401,7 +401,7 @@ namespace NActors {
request.SetProgramStartTime(0);
request.SetSerial(0);
request.SetReceiverNodeId(0);
- request.SetSenderActorId(TString());
+ request.SetSenderActorId(TString());
request.SetCookie(cookie);
request.SetDoCheckCookie(true);
SendExBlock(request, "SendExBlockDoCheckCookie");
@@ -419,7 +419,7 @@ namespace NActors {
}
// restore state
- SelfVirtualId = TActorId();
+ SelfVirtualId = TActorId();
std::swap(tempSocket, Socket);
std::swap(tempPollerToken, PollerToken);
return success;
@@ -455,7 +455,7 @@ namespace NActors {
request.SetProgramStartTime(Common->StartTime);
request.SetSerial(SelfVirtualId.LocalId());
request.SetReceiverNodeId(PeerNodeId);
- request.SetSenderActorId(SelfVirtualId.ToString());
+ request.SetSenderActorId(SelfVirtualId.ToString());
request.SetSenderHostName(Common->TechnicalSelfHostName);
request.SetReceiverHostName(PeerHostName);
@@ -519,7 +519,7 @@ namespace NActors {
ValidateClusterUUID(success, generateError);
ValidateVersionTag(success, generateError);
- const auto& s = success.GetSenderActorId();
+ const auto& s = success.GetSenderActorId();
PeerVirtualId.Parse(s.data(), s.size());
// recover flags
@@ -599,8 +599,8 @@ namespace NActors {
SendInitialPacket();
} else {
// peer wants a new session, clear fields and send initial packet
- SelfVirtualId = TActorId();
- PeerVirtualId = TActorId();
+ SelfVirtualId = TActorId();
+ PeerVirtualId = TActorId();
NextPacketToPeer = 0;
SendInitialPacket();
@@ -637,7 +637,7 @@ namespace NActors {
PeerHostName = request.GetSenderHostName();
// parse peer virtual id
- const auto& str = request.GetSenderActorId();
+ const auto& str = request.GetSenderActorId();
PeerVirtualId.Parse(str.data(), str.size());
// validate request
@@ -709,7 +709,7 @@ namespace NActors {
SendExBlock(record, "ExReply");
// extract sender actor id (self virtual id)
- const auto& str = success.GetSenderActorId();
+ const auto& str = success.GetSenderActorId();
SelfVirtualId.Parse(str.data(), str.size());
} else if (auto ev = reply->CastAsLocal<TEvHandshakeReplyError>()) {
// in case of error just send reply to the peer and terminate handshake
@@ -981,8 +981,8 @@ namespace NActors {
}
};
- IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
- const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
+ IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
+ const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
TSessionParams params) {
return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket,
std::move(peerHostName), std::move(params)));
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h
index b3c0db6c5d..7c5c25c3b8 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.h
+++ b/library/cpp/actors/interconnect/interconnect_handshake.h
@@ -15,8 +15,8 @@ namespace NActors {
using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>;
- IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
- const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
+ IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
+ const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
TSessionParams params);
IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket);
diff --git a/library/cpp/actors/interconnect/interconnect_impl.h b/library/cpp/actors/interconnect/interconnect_impl.h
index ee29e4d397..2ca0db8763 100644
--- a/library/cpp/actors/interconnect/interconnect_impl.h
+++ b/library/cpp/actors/interconnect/interconnect_impl.h
@@ -4,7 +4,7 @@
#include <library/cpp/actors/protos/interconnect.pb.h>
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/helpers/mon_histogram_helper.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NActors {
// resolve node info
diff --git a/library/cpp/actors/interconnect/interconnect_mon.cpp b/library/cpp/actors/interconnect/interconnect_mon.cpp
index cf924ccbf9..48823c5b0e 100644
--- a/library/cpp/actors/interconnect/interconnect_mon.cpp
+++ b/library/cpp/actors/interconnect/interconnect_mon.cpp
@@ -1,9 +1,9 @@
#include "interconnect_mon.h"
#include "interconnect_tcp_proxy.h"
-
-#include <library/cpp/json/json_value.h>
-#include <library/cpp/json/json_writer.h>
-#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <library/cpp/json/json_value.h>
+#include <library/cpp/json/json_writer.h>
+#include <library/cpp/monlib/service/pages/templates.h>
#include <openssl/ssl.h>
#include <openssl/pem.h>
@@ -14,7 +14,7 @@ namespace NInterconnect {
class TInterconnectMonActor : public TActor<TInterconnectMonActor> {
class TQueryProcessor : public TActorBootstrapped<TQueryProcessor> {
- const TActorId Sender;
+ const TActorId Sender;
const bool Json;
TMap<ui32, TInterconnectProxyTCP::TProxyStats> Stats;
ui32 PendingReplies = 0;
@@ -24,7 +24,7 @@ namespace NInterconnect {
return INTERCONNECT_MONACTOR;
}
- TQueryProcessor(const TActorId& sender, bool json)
+ TQueryProcessor(const TActorId& sender, bool json)
: Sender(sender)
, Json(json)
{}
diff --git a/library/cpp/actors/interconnect/interconnect_mon.h b/library/cpp/actors/interconnect/interconnect_mon.h
index 3fb26053fb..e78229a2c4 100644
--- a/library/cpp/actors/interconnect/interconnect_mon.h
+++ b/library/cpp/actors/interconnect/interconnect_mon.h
@@ -7,9 +7,9 @@ namespace NInterconnect {
NActors::IActor *CreateInterconnectMonActor(TIntrusivePtr<NActors::TInterconnectProxyCommon> common = nullptr);
- static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) {
+ static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) {
char s[12] = {'I', 'C', 'O', 'v', 'e', 'r', 'v', 'i', 'e', 'w', 0, 0};
- return NActors::TActorId(nodeId, TStringBuf(s, 12));
+ return NActors::TActorId(nodeId, TStringBuf(s, 12));
}
} // NInterconnect
diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
index 43419bf70d..c9f6f8b5dc 100644
--- a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
+++ b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
@@ -79,8 +79,8 @@ namespace NActors {
return true;
}
- TActorId GetNameserviceActorId() {
- return TActorId(0, "namesvc");
+ TActorId GetNameserviceActorId() {
+ return TActorId(0, "namesvc");
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 0abe9fe659..b42ae8dffd 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -6,7 +6,7 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket,
+ TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket,
TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common,
std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed,
TDuration deadPeerTimeout, TSessionParams params)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 7e2d8ccb94..4191951abd 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -3,7 +3,7 @@
#include "interconnect_tcp_session.h"
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/monlib/service/pages/templates.h>
#include <util/system/getpid.h>
namespace NActors {
@@ -45,7 +45,7 @@ namespace NActors {
LOG_INFO_IC("ICP01", "ready to work");
}
- void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
+ void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
if (!DynamicPtr) {
// perform usual bootstrap for static nodes
sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
@@ -311,9 +311,9 @@ namespace NActors {
auto event = MakeHolder<TEvHandshakeReplyOK>();
auto* pb = event->Record.MutableSuccess();
- const TActorId virtualId = GenerateSessionVirtualId();
+ const TActorId virtualId = GenerateSessionVirtualId();
pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
- pb->SetSenderActorId(virtualId.ToString());
+ pb->SetSenderActorId(virtualId.ToString());
pb->SetProgramPID(GetPID());
pb->SetProgramStartTime(Common->StartTime);
pb->SetSerial(virtualId.LocalId());
@@ -536,14 +536,14 @@ namespace NActors {
SessionVirtualId.ToString().data());
Session = nullptr;
- SessionID = TActorId();
+ SessionID = TActorId();
// drop all pending events as we are closed
ProcessPendingSessionEvents();
// reset virtual ids as this session is terminated
- SessionVirtualId = TActorId();
- RemoteSessionVirtualId = TActorId();
+ SessionVirtualId = TActorId();
+ RemoteSessionVirtualId = TActorId();
if (Metrics) {
Metrics->IncSessionDeaths();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 023e5bd1ee..e5921134ed 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -4,7 +4,7 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/core/events.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include "interconnect_common.h"
#include "interconnect_counters.h"
@@ -70,7 +70,7 @@ namespace NActors {
}
void Bootstrap();
- void Registered(TActorSystem* sys, const TActorId& owner) override;
+ void Registered(TActorSystem* sys, const TActorId& owner) override;
private:
friend class TInterconnectSessionTCP;
@@ -366,7 +366,7 @@ namespace NActors {
// read only
TInterconnectProxyCommon::TPtr const Common;
- const TActorId& GetNameserviceId() const {
+ const TActorId& GetNameserviceId() const {
return Common->NameserviceId;
}
@@ -403,24 +403,24 @@ namespace NActors {
void DropSessionEvent(STATEFN_SIG);
TInterconnectSessionTCP* Session = nullptr;
- TActorId SessionID;
+ TActorId SessionID;
// virtual ids used during handshake to check if it is the connection
// for the same session or to find out the latest shandshake
// it's virtual because session actor apears after successfull handshake
- TActorId SessionVirtualId;
- TActorId RemoteSessionVirtualId;
+ TActorId SessionVirtualId;
+ TActorId RemoteSessionVirtualId;
- TActorId GenerateSessionVirtualId() {
+ TActorId GenerateSessionVirtualId() {
ICPROXY_PROFILED;
const ui64 localId = TlsActivationContext->ExecutorThread.ActorSystem->AllocateIDSpace(1);
- return NActors::TActorId(SelfId().NodeId(), 0, localId, 0);
+ return NActors::TActorId(SelfId().NodeId(), 0, localId, 0);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TActorId IncomingHandshakeActor;
+ TActorId IncomingHandshakeActor;
TInstant IncomingHandshakeActorFilledIn;
TInstant IncomingHandshakeActorReset;
TMaybe<ui64> LastSerialFromIncomingHandshake;
@@ -429,7 +429,7 @@ namespace NActors {
void DropIncomingHandshake(bool poison = true) {
ICPROXY_PROFILED;
- if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) {
+ if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) {
LOG_DEBUG_IC("ICP111", "dropped incoming handshake: %s poison: %s", actorId.ToString().data(),
poison ? "true" : "false");
if (poison) {
@@ -444,7 +444,7 @@ namespace NActors {
void DropOutgoingHandshake(bool poison = true) {
ICPROXY_PROFILED;
- if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) {
+ if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) {
LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(),
poison ? "true" : "false");
if (poison) {
@@ -477,12 +477,12 @@ namespace NActors {
SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection);
}
- void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
+ void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
THolder<IEventBase> event);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TActorId OutgoingHandshakeActor;
+ TActorId OutgoingHandshakeActor;
TInstant OutgoingHandshakeActorCreated;
TInstant OutgoingHandshakeActorReset;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index b95c994598..2c025dc389 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -23,7 +23,7 @@ namespace NActors {
}
}
- TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
+ TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h
index fc71073c2d..086fe26ab3 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h
@@ -34,7 +34,7 @@ namespace NActors {
}
}
- TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override;
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override;
void Die(const TActorContext& ctx) override;
@@ -50,8 +50,8 @@ namespace NActors {
TInterconnectProxyCommon::TPtr const ProxyCommonCtx;
};
- static inline TActorId MakeInterconnectListenerActorId(bool dynamic) {
+ static inline TActorId MakeInterconnectListenerActorId(bool dynamic) {
char x[12] = {'I', 'C', 'L', 'i', 's', 't', 'e', 'n', 'e', 'r', '/', dynamic ? 'D' : 'S'};
- return TActorId(0, TStringBuf(x, 12));
+ return TActorId(0, TStringBuf(x, 12));
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 2ded7f9f53..468e8bdd64 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -7,7 +7,7 @@
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/monlib/service/pages/templates.h>
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
@@ -474,7 +474,7 @@ namespace NActors {
if (ev->Sender == ReceiverId) {
const bool wasConnected(Socket);
LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
- ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
+ ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
if (wasConnected) {
// we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
// restart handshake process, closing our part of socket first
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 7fc00dbcc5..dfab4065c0 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -10,7 +10,7 @@
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/util/funnel_queue.h>
#include <library/cpp/actors/util/recentwnd.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <util/generic/queue.h>
@@ -179,7 +179,7 @@ namespace NActors {
return INTERCONNECT_SESSION_TCP;
}
- TInputSessionTCP(const TActorId& sessionId,
+ TInputSessionTCP(const TActorId& sessionId,
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
TIntrusivePtr<TReceiveContext> context,
TInterconnectProxyCommon::TPtr common,
@@ -495,7 +495,7 @@ namespace NActors {
void GenerateHttpInfo(TStringStream& str);
TIntrusivePtr<TReceiveContext> ReceiveContext;
- TActorId ReceiverId;
+ TActorId ReceiverId;
TDuration Ping;
ui64 ConfirmPacketsForcedBySize = 0;
@@ -513,7 +513,7 @@ namespace NActors {
: public TActorBootstrapped<TInterconnectSessionKiller> {
ui32 RepliesReceived = 0;
ui32 RepliesNumber = 0;
- TActorId LargestSession = TActorId();
+ TActorId LargestSession = TActorId();
ui64 MaxBufferSize = 0;
TInterconnectProxyCommon::TPtr Common;
@@ -529,7 +529,7 @@ namespace NActors {
void Bootstrap() {
auto sender = SelfId();
- const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
+ const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
auto ev = new TEvSessionBufferSizeRequest();
return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
};
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index 2a8443da71..22850b3126 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -72,7 +72,7 @@ namespace NInterconnect {
};
class TLoadResponderMasterActor : public TActorBootstrapped<TLoadResponderMasterActor> {
- TVector<TActorId> Slaves;
+ TVector<TActorId> Slaves;
ui32 SlaveIndex = 0;
STRICT_STFUNC(StateFunc,
@@ -93,7 +93,7 @@ namespace NInterconnect {
}
void Die(const TActorContext& ctx) override {
- for (const TActorId& actorId : Slaves) {
+ for (const TActorId& actorId : Slaves) {
ctx.Send(actorId, new TEvents::TEvPoisonPill);
}
TActorBootstrapped::Die(ctx);
@@ -122,9 +122,9 @@ namespace NInterconnect {
return new TLoadResponderMasterActor();
}
- TActorId MakeLoadResponderActorId(ui32 nodeId) {
+ TActorId MakeLoadResponderActorId(ui32 nodeId) {
char x[12] = {'I', 'C', 'L', 'o', 'a', 'd', 'R', 'e', 's', 'p', 'A', 'c'};
- return TActorId(nodeId, TStringBuf(x, 12));
+ return TActorId(nodeId, TStringBuf(x, 12));
}
class TLoadActor: public TActorBootstrapped<TLoadActor> {
@@ -144,8 +144,8 @@ namespace NInterconnect {
TInstant NextMessageTimestamp;
THashMap<TString, TMessageInfo> InFly;
ui64 NextId = 1;
- TVector<TActorId> Hops;
- TActorId FirstHop;
+ TVector<TActorId> Hops;
+ TActorId FirstHop;
ui64 NumDropped = 0;
std::shared_ptr<std::atomic_uint64_t> Traffic;
@@ -167,7 +167,7 @@ namespace NInterconnect {
Traffic = std::move(ev->Get()->Traffic);
for (const ui32 nodeId : Params.NodeHops) {
- const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId();
+ const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId();
if (!FirstHop) {
FirstHop = actorId;
} else {
diff --git a/library/cpp/actors/interconnect/load.h b/library/cpp/actors/interconnect/load.h
index 0a01a0dc04..060fa7641b 100644
--- a/library/cpp/actors/interconnect/load.h
+++ b/library/cpp/actors/interconnect/load.h
@@ -5,7 +5,7 @@
namespace NInterconnect {
// load responder -- lives on every node as a service actor
NActors::IActor* CreateLoadResponderActor();
- NActors::TActorId MakeLoadResponderActorId(ui32 node);
+ NActors::TActorId MakeLoadResponderActorId(ui32 node);
// load actor -- generates load with specific parameters
struct TLoadParams {
diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp
index 884503e602..1267920559 100644
--- a/library/cpp/actors/interconnect/mock/ic_mock.cpp
+++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp
@@ -42,7 +42,7 @@ namespace NActors {
: Key(key)
{}
- void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) {
+ void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) {
TPeerInfo *peer = GetPeer(nodeId);
auto guard = TWriteGuard(peer->Mutex);
Y_VERIFY(!peer->ActorSystem);
@@ -188,7 +188,7 @@ namespace NActors {
, Common(std::move(common))
{}
- void Registered(TActorSystem *as, const TActorId& parent) override {
+ void Registered(TActorSystem *as, const TActorId& parent) override {
TActor::Registered(as, parent);
State.Attach(NodeId, as, SelfId());
}
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 4ba50a2b5f..187d0b6bdf 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -18,7 +18,7 @@
using NActors::IEventBase;
using NActors::IEventHandle;
-using NActors::TActorId;
+using NActors::TActorId;
using NActors::TConstIoVec;
using NActors::TEventSerializedData;
@@ -91,8 +91,8 @@ union TTcpPacketBuf {
struct TEventDescr {
ui32 Type;
ui32 Flags;
- TActorId Recipient;
- TActorId Sender;
+ TActorId Recipient;
+ TActorId Sender;
ui64 Cookie;
// wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor
NWilson::TTraceId::TSerializedTraceId TraceId;
@@ -102,7 +102,7 @@ struct TEventDescr {
struct TEventHolder : TNonCopyable {
TEventDescr Descr;
- TActorId ForwardRecipient;
+ TActorId ForwardRecipient;
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
ui64 Serial;
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index e75cbcaef4..8c7b61a7a7 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -1,35 +1,35 @@
#include "poller_actor.h"
#include "interconnect_common.h"
-#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/probes.h>
-#include <library/cpp/actors/protos/services_common.pb.h>
+#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/actors/util/funnel_queue.h>
-
+
#include <util/generic/intrlist.h>
#include <util/system/thread.h>
#include <util/system/event.h>
#include <util/system/pipe.h>
#include <variant>
-
+
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- namespace {
+ namespace {
int LastSocketError() {
#if defined(_win_)
return WSAGetLastError();
#else
return errno;
#endif
- }
- }
-
+ }
+ }
+
struct TSocketRecord : TThrRefBase {
const TIntrusivePtr<TSharedDescriptor> Socket;
const TActorId ReadActorId;
@@ -57,7 +57,7 @@ namespace NActors {
: Socket(std::move(socket))
{}
};
-
+
using TPollerSyncOperation = std::variant<TPollerExitThread, TPollerWakeup, TPollerUnregisterSocket>;
struct TPollerSyncOperationWrapper {
@@ -149,7 +149,7 @@ namespace NActors {
bool DrainReadEnd() {
size_t totalRead = 0;
char buffer[4096];
- for (;;) {
+ for (;;) {
ssize_t n = ReadEnd.Read(buffer, sizeof(buffer));
if (n < 0) {
const int error = LastSocketError();
@@ -157,17 +157,17 @@ namespace NActors {
continue;
} else if (error == EAGAIN || error == EWOULDBLOCK) {
break;
- } else {
+ } else {
Y_FAIL("read() failed with %s", strerror(errno));
- }
+ }
} else {
Y_VERIFY(n);
totalRead += n;
- }
+ }
}
return totalRead;
}
-
+
bool ProcessSyncOpQueue() {
if (DrainReadEnd()) {
Y_VERIFY(!SyncOperationsQ.IsEmpty());
@@ -181,25 +181,25 @@ namespace NActors {
return false; // terminate the thread
} else if (std::get_if<TPollerWakeup>(&op->Operation)) {
op->SignalDone();
- } else {
+ } else {
Y_FAIL();
- }
+ }
} while (SyncOperationsQ.Pop());
- }
+ }
return true;
- }
-
+ }
+
void *ThreadProc() override {
SetCurrentThreadName("network poller");
while (ProcessSyncOpQueue()) {
static_cast<TDerived&>(*this).ProcessEventsInLoop();
- }
+ }
return nullptr;
- }
+ }
};
-
+
} // namespace NActors
-
+
#if defined(_linux_)
# include "poller_actor_linux.h"
#elif defined(_darwin_)
@@ -209,38 +209,38 @@ namespace NActors {
#else
# error "Unsupported platform"
#endif
-
+
namespace NActors {
-
+
class TPollerToken::TImpl {
std::weak_ptr<TPollerThread> Thread;
TIntrusivePtr<TSocketRecord> Record; // valid only when Thread is held locked
-
- public:
+
+ public:
TImpl(std::shared_ptr<TPollerThread> thread, TIntrusivePtr<TSocketRecord> record)
: Thread(thread)
, Record(std::move(record))
- {
+ {
thread->RegisterSocket(Record);
}
-
+
~TImpl() {
if (auto thread = Thread.lock()) {
thread->UnregisterSocket(Record);
- }
- }
-
+ }
+ }
+
void Request(bool read, bool write) {
if (auto thread = Thread.lock()) {
thread->Request(Record, read, write);
- }
- }
+ }
+ }
const TIntrusivePtr<TSharedDescriptor>& Socket() const {
return Record->Socket;
}
- };
-
+ };
+
class TPollerActor: public TActorBootstrapped<TPollerActor> {
// poller thread
std::shared_ptr<TPollerThread> PollerThread;
diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h
index f927b82089..5bd4f50704 100644
--- a/library/cpp/actors/interconnect/poller_actor.h
+++ b/library/cpp/actors/interconnect/poller_actor.h
@@ -55,9 +55,9 @@ namespace NActors {
IActor* CreatePollerActor();
- inline TActorId MakePollerActorId() {
+ inline TActorId MakePollerActorId() {
char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'};
- return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
}
}
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 565a511859..bbdabbd339 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
index e6b2bd4e4c..334859882f 100644
--- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
@@ -2,7 +2,7 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/interconnect/interconnect_common.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/interconnect/event_holder_pool.h>
#include <atomic>
diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp
index ba2a50c6f6..d67509f058 100644
--- a/library/cpp/actors/interconnect/ut/large.cpp
+++ b/library/cpp/actors/interconnect/ut/large.cpp
@@ -14,10 +14,10 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
using namespace NActors;
class TProducer: public TActorBootstrapped<TProducer> {
- const TActorId RecipientActorId;
+ const TActorId RecipientActorId;
public:
- TProducer(const TActorId& recipientActorId)
+ TProducer(const TActorId& recipientActorId)
: RecipientActorId(recipientActorId)
{}
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
class TConsumer : public TActorBootstrapped<TConsumer> {
TManualEvent& Done;
- TActorId SessionId;
+ TActorId SessionId;
public:
TConsumer(TManualEvent& done)
@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
TManualEvent done;
TConsumer* consumer = new TConsumer(done);
- const TActorId recp = testCluster.RegisterActor(consumer, 1);
+ const TActorId recp = testCluster.RegisterActor(consumer, 1);
testCluster.RegisterActor(new TProducer(recp), 2);
done.WaitI();
}
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
index 2b6d27cd3f..ac46180804 100644
--- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
+++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -70,7 +70,7 @@ public:
~TTestICCluster() {
}
- TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
+ TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
return Nodes[nodeId]->RegisterActor(actor);
}
@@ -78,7 +78,7 @@ public:
return Nodes[nodeId]->InterconnectProxy(peerNodeId);
}
- void KillActor(ui32 nodeId, const TActorId& id) {
+ void KillActor(ui32 nodeId, const TActorId& id) {
Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill);
}
};
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index ff30b1445e..59dd2554c8 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -62,7 +62,7 @@ public:
setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),
TMailboxType::ReadAsFilled, 0));
- const TActorId loggerActorId(0, "logger");
+ const TActorId loggerActorId(0, "logger");
constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
auto loggerSettings = MakeIntrusive<NLog::TSettings>(
@@ -114,7 +114,7 @@ public:
ActorSystem->Stop();
}
- bool Send(const TActorId& recipient, IEventBase* ev) {
+ bool Send(const TActorId& recipient, IEventBase* ev) {
return ActorSystem->Send(recipient, ev);
}
@@ -127,7 +127,7 @@ public:
}
void RegisterServiceActor(const TActorId& serviceId, IActor* actor) {
- const TActorId actorId = ActorSystem->Register(actor);
+ const TActorId actorId = ActorSystem->Register(actor);
ActorSystem->RegisterLocalService(serviceId, actorId);
}
diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h
index 7591200471..07fe10d93a 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_actors.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h
@@ -3,13 +3,13 @@
namespace NActors {
class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {
protected:
- const TActorId RecipientActorId;
+ const TActorId RecipientActorId;
const ui32 Preload;
ui64 SequenceNumber = 0;
ui32 InFlySize = 0;
public:
- TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
+ TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
: RecipientActorId(recipientActorId)
, Preload(preload)
{
diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
index 23d846a2fd..dbd05ce746 100644
--- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
@@ -1,38 +1,38 @@
-#include <library/cpp/actors/interconnect/poller_actor.h>
-#include <library/cpp/actors/testlib/test_runtime.h>
-
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
+
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/network/pair.h>
-#include <util/network/socket.h>
-
-using namespace NActors;
-
-class TTestSocket: public TSharedDescriptor {
-public:
- explicit TTestSocket(SOCKET fd)
- : Fd_(fd)
- {
- }
-
- int GetDescriptor() override {
- return Fd_;
- }
-
-private:
- SOCKET Fd_;
-};
-using TTestSocketPtr = TIntrusivePtr<TTestSocket>;
-
-// create pair of connected, non-blocking sockets
-std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() {
- SOCKET fds[2];
- SocketPair(fds);
- SetNonBlock(fds[0]);
- SetNonBlock(fds[1]);
- return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])};
-}
-
+
+#include <util/network/pair.h>
+#include <util/network/socket.h>
+
+using namespace NActors;
+
+class TTestSocket: public TSharedDescriptor {
+public:
+ explicit TTestSocket(SOCKET fd)
+ : Fd_(fd)
+ {
+ }
+
+ int GetDescriptor() override {
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
+};
+using TTestSocketPtr = TIntrusivePtr<TTestSocket>;
+
+// create pair of connected, non-blocking sockets
+std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() {
+ SOCKET fds[2];
+ SocketPair(fds);
+ SetNonBlock(fds[0]);
+ SetNonBlock(fds[1]);
+ return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])};
+}
+
std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() {
// create server (listening) socket
SOCKET server = socket(AF_INET, SOCK_STREAM, 0);
@@ -74,101 +74,101 @@ std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() {
return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted));
}
-class TPollerActorTest: public TTestBase {
- UNIT_TEST_SUITE(TPollerActorTest);
- UNIT_TEST(Registration)
- UNIT_TEST(ReadNotification)
- UNIT_TEST(WriteNotification)
- UNIT_TEST(HangupNotification)
- UNIT_TEST_SUITE_END();
-
-public:
- void SetUp() override {
- ActorSystem_ = MakeHolder<TTestActorRuntimeBase>();
- ActorSystem_->Initialize();
-
- PollerId_ = ActorSystem_->Register(CreatePollerActor());
-
- TDispatchOptions opts;
- opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
- ActorSystem_->DispatchEvents(opts);
- }
-
- void Registration() {
- auto [s1, s2] = NonBlockSockets();
- auto readerId = ActorSystem_->AllocateEdgeActor();
- auto writerId = ActorSystem_->AllocateEdgeActor();
-
+class TPollerActorTest: public TTestBase {
+ UNIT_TEST_SUITE(TPollerActorTest);
+ UNIT_TEST(Registration)
+ UNIT_TEST(ReadNotification)
+ UNIT_TEST(WriteNotification)
+ UNIT_TEST(HangupNotification)
+ UNIT_TEST_SUITE_END();
+
+public:
+ void SetUp() override {
+ ActorSystem_ = MakeHolder<TTestActorRuntimeBase>();
+ ActorSystem_->Initialize();
+
+ PollerId_ = ActorSystem_->Register(CreatePollerActor());
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
+ ActorSystem_->DispatchEvents(opts);
+ }
+
+ void Registration() {
+ auto [s1, s2] = NonBlockSockets();
+ auto readerId = ActorSystem_->AllocateEdgeActor();
+ auto writerId = ActorSystem_->AllocateEdgeActor();
+
RegisterSocket(s1, readerId, writerId);
-
- // reader should receive event after socket registration
+
+ // reader should receive event after socket registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId);
token = ev->Get()->PollerToken;
- }
-
- // writer should receive event after socket registration
- {
+ }
+
+ // writer should receive event after socket registration
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId);
UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken);
- }
- }
-
- void ReadNotification() {
- auto [r, w] = NonBlockSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ }
+ }
+
+ void ReadNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
RegisterSocket(r, clientId, {});
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
- char buf;
-
- // data not ready yet for read
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
- UNIT_ASSERT(errno == EWOULDBLOCK);
-
+ }
+
+ char buf;
+
+ // data not ready yet for read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+
// request read poll
token->Request(true, false);
- // write data
- UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1);
-
- // notification after socket become readable
- {
+ // write data
+ UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1);
+
+ // notification after socket become readable
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
UNIT_ASSERT(ev->Get()->Read);
UNIT_ASSERT(!ev->Get()->Write);
- }
-
- // read data
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1);
- UNIT_ASSERT_EQUAL('x', buf);
-
- // no more data to read
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
- UNIT_ASSERT(errno == EWOULDBLOCK);
- }
-
- void WriteNotification() {
+ }
+
+ // read data
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1);
+ UNIT_ASSERT_EQUAL('x', buf);
+
+ // no more data to read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+ }
+
+ void WriteNotification() {
auto [r, w] = TcpSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
SetNonBlock(w->GetDescriptor());
RegisterSocket(w, TActorId{}, clientId);
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
{
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
+ }
+
char buffer[4096];
memset(buffer, 'x', sizeof(buffer));
@@ -181,7 +181,7 @@ public:
written += res;
} else if (res == 0) {
UNIT_FAIL("unexpected zero return from send()");
- } else {
+ } else {
UNIT_ASSERT(res == -1);
if (errno == EINTR) {
continue;
@@ -191,10 +191,10 @@ public:
} else {
UNIT_FAIL("unexpected error from send()");
}
- }
- }
+ }
+ }
Cerr << "written " << written << " bytes" << Endl;
-
+
// read all written data from the read end
for (;;) {
char buffer[4096];
@@ -216,7 +216,7 @@ public:
}
}
}
-
+
// wait for notification after socket becomes writable again
{
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
@@ -224,41 +224,41 @@ public:
UNIT_ASSERT(!ev->Get()->Read);
UNIT_ASSERT(ev->Get()->Write);
}
- }
- }
-
- void HangupNotification() {
- auto [r, w] = NonBlockSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ }
+ }
+
+ void HangupNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
RegisterSocket(r, clientId, TActorId{});
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
+ }
+
token->Request(true, false);
ShutDown(w->GetDescriptor(), SHUT_RDWR);
-
+
// notification after peer shuts down its socket
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
UNIT_ASSERT(ev->Get()->Read);
- }
- }
-
-private:
+ }
+ }
+
+private:
void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) {
auto ev = new TEvPollerRegister{socket, readActorId, writeActorId};
- ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
- }
-
-private:
- THolder<TTestActorRuntimeBase> ActorSystem_;
- TActorId PollerId_;
-};
-
-UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest);
+ ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
+ }
+
+private:
+ THolder<TTestActorRuntimeBase> ActorSystem_;
+ TActorId PollerId_;
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest);
diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make
index 2f5b13352e..ec19f1a64a 100644
--- a/library/cpp/actors/interconnect/ut/ya.make
+++ b/library/cpp/actors/interconnect/ut/ya.make
@@ -15,11 +15,11 @@ ELSE()
ENDIF()
SRCS(
- channel_scheduler_ut.cpp
+ channel_scheduler_ut.cpp
event_holder_pool_ut.cpp
interconnect_ut.cpp
large.cpp
- poller_actor_ut.cpp
+ poller_actor_ut.cpp
dynamic_proxy_ut.cpp
)
@@ -28,7 +28,7 @@ PEERDIR(
library/cpp/actors/interconnect
library/cpp/actors/interconnect/ut/lib
library/cpp/actors/interconnect/ut/protos
- library/cpp/actors/testlib
+ library/cpp/actors/testlib
library/cpp/digest/md5
library/cpp/testing/unittest
)
diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp
index 5d19bc3003..69374cd080 100644
--- a/library/cpp/actors/interconnect/ut_fat/main.cpp
+++ b/library/cpp/actors/interconnect/ut_fat/main.cpp
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
ui16 SendFlags;
public:
- TSenderActor(const TActorId& recipientActorId, ui16 sendFlags)
+ TSenderActor(const TActorId& recipientActorId, ui16 sendFlags)
: TSenderBaseActor(recipientActorId, 32)
, SendFlags(sendFlags)
{
@@ -108,7 +108,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
- const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
+ const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
TSenderActor* senderActor = new TSenderActor(recipient, flags);
testCluster.RegisterActor(senderActor, 1);
@@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
- const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
+ const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
TSenderActor* senderActor = new TSenderActor(recipient, flags);
testCluster.RegisterActor(senderActor, 1);
diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make
index 60d29b0fc0..9e4fb46fdb 100644
--- a/library/cpp/actors/interconnect/ya.make
+++ b/library/cpp/actors/interconnect/ya.make
@@ -75,18 +75,18 @@ PEERDIR(
contrib/libs/libc_compat
contrib/libs/openssl
library/cpp/actors/core
- library/cpp/actors/dnscachelib
+ library/cpp/actors/dnscachelib
library/cpp/actors/dnsresolver
library/cpp/actors/helpers
library/cpp/actors/prof
library/cpp/actors/protos
library/cpp/actors/util
library/cpp/digest/crc32c
- library/cpp/json
+ library/cpp/json
library/cpp/lwtrace
- library/cpp/monlib/dynamic_counters
+ library/cpp/monlib/dynamic_counters
library/cpp/monlib/metrics
- library/cpp/monlib/service/pages/tablesorter
+ library/cpp/monlib/service/pages/tablesorter
library/cpp/openssl/init
library/cpp/packedtypes
)