aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:02 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:02 +0300
commit3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch)
treec2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/actors
parentab3783171cc30e262243a0227c86118f7080c896 (diff)
downloadydb-3e0b762a82514bac89c1dd6ea7211e381d8aa248.tar.gz
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actor.cpp46
-rw-r--r--library/cpp/actors/core/actor.h154
-rw-r--r--library/cpp/actors/core/actor_bootstrapped.h6
-rw-r--r--library/cpp/actors/core/actor_coroutine.h12
-rw-r--r--library/cpp/actors/core/actor_coroutine_ut.cpp4
-rw-r--r--library/cpp/actors/core/actorid.cpp6
-rw-r--r--library/cpp/actors/core/actorid.h34
-rw-r--r--library/cpp/actors/core/actorsystem.cpp48
-rw-r--r--library/cpp/actors/core/actorsystem.h88
-rw-r--r--library/cpp/actors/core/actorsystem_ut.cpp90
-rw-r--r--library/cpp/actors/core/ask.cpp6
-rw-r--r--library/cpp/actors/core/ask.h2
-rw-r--r--library/cpp/actors/core/event.h38
-rw-r--r--library/cpp/actors/core/event_pb.h6
-rw-r--r--library/cpp/actors/core/events.h14
-rw-r--r--library/cpp/actors/core/events_undelivered.cpp2
-rw-r--r--library/cpp/actors/core/executelater.h12
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp6
-rw-r--r--library/cpp/actors/core/executor_pool_base.h4
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp6
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h4
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp16
-rw-r--r--library/cpp/actors/core/executor_pool_io.cpp10
-rw-r--r--library/cpp/actors/core/executor_pool_io.h2
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp6
-rw-r--r--library/cpp/actors/core/executor_pool_united.h4
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp2
-rw-r--r--library/cpp/actors/core/executor_thread.cpp20
-rw-r--r--library/cpp/actors/core/executor_thread.h14
-rw-r--r--library/cpp/actors/core/invoke.h2
-rw-r--r--library/cpp/actors/core/log.cpp2
-rw-r--r--library/cpp/actors/core/log.h4
-rw-r--r--library/cpp/actors/core/log_settings.cpp4
-rw-r--r--library/cpp/actors/core/log_settings.h6
-rw-r--r--library/cpp/actors/core/log_ut.cpp4
-rw-r--r--library/cpp/actors/core/mailbox.cpp2
-rw-r--r--library/cpp/actors/core/mailbox.h6
-rw-r--r--library/cpp/actors/core/mon.h4
-rw-r--r--library/cpp/actors/core/process_stats.cpp68
-rw-r--r--library/cpp/actors/core/process_stats.h6
-rw-r--r--library/cpp/actors/core/scheduler_actor.cpp4
-rw-r--r--library/cpp/actors/core/scheduler_actor.h4
-rw-r--r--library/cpp/actors/core/scheduler_actor_ut.cpp8
-rw-r--r--library/cpp/actors/core/ut/ya.make10
-rw-r--r--library/cpp/actors/core/ya.make2
-rw-r--r--library/cpp/actors/helpers/activeactors.h6
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.cpp12
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.h2
-rw-r--r--library/cpp/actors/helpers/mon_histogram_helper.h8
-rw-r--r--library/cpp/actors/helpers/selfping_actor.h2
-rw-r--r--library/cpp/actors/helpers/ya.make2
-rw-r--r--library/cpp/actors/http/http_cache.cpp4
-rw-r--r--library/cpp/actors/http/http_cache.h2
-rw-r--r--library/cpp/actors/http/http_proxy.cpp26
-rw-r--r--library/cpp/actors/http/http_proxy.h34
-rw-r--r--library/cpp/actors/http/http_proxy_acceptor.cpp16
-rw-r--r--library/cpp/actors/http/http_proxy_outgoing.cpp14
-rw-r--r--library/cpp/actors/http/http_ut.cpp24
-rw-r--r--library/cpp/actors/http/ya.make2
-rw-r--r--library/cpp/actors/interconnect/events_local.h28
-rw-r--r--library/cpp/actors/interconnect/interconnect.h10
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h8
-rw-r--r--library/cpp/actors/interconnect/interconnect_counters.cpp10
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp34
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_impl.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_mon.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_mon.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_nameserver_table.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp14
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h26
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.h6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h10
-rw-r--r--library/cpp/actors/interconnect/load.cpp14
-rw-r--r--library/cpp/actors/interconnect/load.h2
-rw-r--r--library/cpp/actors/interconnect/mock/ic_mock.cpp4
-rw-r--r--library/cpp/actors/interconnect/packet.h8
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp72
-rw-r--r--library/cpp/actors/interconnect/poller_actor.h4
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/large.cpp8
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h4
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h6
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_actors.h4
-rw-r--r--library/cpp/actors/interconnect/ut/poller_actor_ut.cpp272
-rw-r--r--library/cpp/actors/interconnect/ut/ya.make6
-rw-r--r--library/cpp/actors/interconnect/ut_fat/main.cpp6
-rw-r--r--library/cpp/actors/interconnect/ya.make8
-rw-r--r--library/cpp/actors/protos/actors.proto4
-rw-r--r--library/cpp/actors/protos/interconnect.proto6
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp102
-rw-r--r--library/cpp/actors/testlib/test_runtime.h74
97 files changed, 871 insertions, 871 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index 6f9ba6a42b..c84b8d7e2f 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -7,7 +7,7 @@ namespace NActors {
Y_POD_THREAD(TActivationContext*)
TlsActivationContext((TActivationContext*)nullptr);
- bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
+ bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
return Send(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
}
@@ -15,7 +15,7 @@ namespace NActors {
return ExecutorThread.Send(ev);
}
- void IActor::Registered(TActorSystem* sys, const TActorId& owner) {
+ void IActor::Registered(TActorSystem* sys, const TActorId& owner) {
// fallback to legacy method, do not use it anymore
if (auto eh = AfterRegister(SelfId(), owner))
sys->Send(eh);
@@ -25,7 +25,7 @@ namespace NActors {
SelfActorId.Out(out);
}
- bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept {
+ bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept {
return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId));
}
@@ -33,19 +33,19 @@ namespace NActors {
return TlsActivationContext->ExecutorThread.Send(ev);
}
- void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
- TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
- }
-
+ void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
+ }
+
void TActivationContext::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
}
void TActivationContext::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
- TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie);
+ TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie);
}
- bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
+ bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
return TActivationContext::Send(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
}
@@ -75,11 +75,11 @@ namespace NActors {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId.Hint(), SelfActorId);
}
- TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
+ TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, parentId);
}
- TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) {
+ TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) {
return TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(destinationNodeId);
}
@@ -95,36 +95,36 @@ namespace NActors {
return NHPTimer::GetSeconds(GetCurrentEventTicks());
}
- TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
+ TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
return ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfID);
}
- TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
+ TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId);
}
- void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
- }
-
+ void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
+ ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
+ }
+
void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
}
void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie);
- }
-
- void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
+ ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie);
}
+ void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
+ TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
+ }
+
void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
+ TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
TInstant TActivationContext::Now() {
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index ed29bd14b9..8bfef6b5bd 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -36,17 +36,17 @@ namespace NActors {
public:
static bool Send(TAutoPtr<IEventHandle> ev);
-
- /**
- * Schedule one-shot event that will be send at given time point in the future.
- *
+
+ /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
* @param deadline the wallclock time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
-
- /**
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+
+ /**
* Schedule one-shot event that will be send at given time point in the future.
*
* @param deadline the monotonic time point in future when event must be send
@@ -56,12 +56,12 @@ namespace NActors {
static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
/**
- * Schedule one-shot event that will be send after given delay.
- *
- * @param delta the time from now to delay event sending
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
+ * Schedule one-shot event that will be send after given delay.
+ *
+ * @param delta the time from now to delay event sending
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
static TInstant Now();
@@ -79,9 +79,9 @@ namespace NActors {
static TActorId RegisterWithSameMailbox(IActor* actor, TActorId parentId);
static const TActorContext& AsActorContext();
- static TActorContext ActorContextFor(TActorId id);
+ static TActorContext ActorContextFor(TActorId id);
- static TActorId InterconnectProxy(ui32 nodeid);
+ static TActorId InterconnectProxy(ui32 nodeid);
static TActorSystem* ActorSystem();
static i64 GetCurrentEventTicks();
@@ -89,34 +89,34 @@ namespace NActors {
};
struct TActorContext: public TActivationContext {
- const TActorId SelfID;
+ const TActorId SelfID;
- explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
+ explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
: TActivationContext(mailbox, executorThread, eventStart)
, SelfID(selfID)
{
}
- bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
+ bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
template <typename TEvent>
- bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
+ bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId));
}
bool Send(TAutoPtr<IEventHandle> ev) const;
-
+
TInstant Now() const;
TMonotonic Monotonic() const;
- /**
- * Schedule one-shot event that will be send at given time point in the future.
- *
+ /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
* @param deadline the wallclock time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
-
- /**
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
+
+ /**
* Schedule one-shot event that will be send at given time point in the future.
*
* @param deadline the monotonic time point in future when event must be send
@@ -126,20 +126,20 @@ namespace NActors {
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
/**
- * Schedule one-shot event that will be send after given delay.
- *
- * @param delta the time from now to delay event sending
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
+ * Schedule one-shot event that will be send after given delay.
+ *
+ * @param delta the time from now to delay event sending
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
-
- TActorContext MakeFor(const TActorId& otherId) const {
+
+ TActorContext MakeFor(const TActorId& otherId) const {
return TActorContext(Mailbox, ExecutorThread, EventStart, otherId);
}
// register new actor in ActorSystem on new fresh mailbox.
- TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const;
+ TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const;
// Register new actor in ActorSystem on same _mailbox_ as current actor.
// There is one thread per mailbox to execute actor, which mean
@@ -153,17 +153,17 @@ namespace NActors {
extern Y_POD_THREAD(TActivationContext*) TlsActivationContext;
- struct TActorIdentity: public TActorId {
- explicit TActorIdentity(TActorId actorId)
- : TActorId(actorId)
+ struct TActorIdentity: public TActorId {
+ explicit TActorIdentity(TActorId actorId)
+ : TActorId(actorId)
{
}
- void operator=(TActorId actorId) {
+ void operator=(TActorId actorId) {
*this = TActorIdentity(actorId);
}
- bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
+ bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
@@ -174,18 +174,18 @@ namespace NActors {
class IActorOps : TNonCopyable {
public:
virtual void Describe(IOutputStream&) const noexcept = 0;
- virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
-
- /**
- * Schedule one-shot event that will be send at given time point in the future.
- *
+ virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
+
+ /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
* @param deadline the wallclock time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
-
- /**
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
+
+ /**
* Schedule one-shot event that will be send at given time point in the future.
*
* @param deadline the monotonic time point in future when event must be send
@@ -195,15 +195,15 @@ namespace NActors {
virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
/**
- * Schedule one-shot event that will be send after given delay.
- *
- * @param delta the time from now to delay event sending
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
-
- virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
+ * Schedule one-shot event that will be send after given delay.
+ *
+ * @param delta the time from now to delay event sending
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
+
+ virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;
};
@@ -219,7 +219,7 @@ namespace NActors {
i64 ElapsedTicks;
ui64 HandledEvents;
- friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
+ friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
friend class TDecorator;
public:
@@ -254,7 +254,7 @@ namespace NActors {
protected:
IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER)
: StateFunc(stateFunc)
- , SelfActorId(TActorId())
+ , SelfActorId(TActorId())
, ElapsedTicks(0)
, HandledEvents(0)
, ActivityType(activityType)
@@ -310,7 +310,7 @@ namespace NActors {
InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
struct TRecurseContext : TActorContext {
TActivationContext *Prev;
- TRecurseContext(const TActorId& actorId)
+ TRecurseContext(const TActorId& actorId)
: TActorContext(TActivationContext::ActorContextFor(actorId))
, Prev(TlsActivationContext)
{
@@ -323,9 +323,9 @@ namespace NActors {
return (actor.*method)(std::forward<TArgs>(args)...);
}
- virtual void Registered(TActorSystem* sys, const TActorId& owner);
+ virtual void Registered(TActorSystem* sys, const TActorId& owner);
- virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) {
+ virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) {
Y_UNUSED(self);
Y_UNUSED(parentId);
return TAutoPtr<IEventHandle>();
@@ -350,23 +350,23 @@ namespace NActors {
protected:
void Describe(IOutputStream&) const noexcept override;
- bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
+ bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
template <typename TEvent>
- bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
+ bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId));
}
-
+
template <class TEvent, class ... TEventArgs>
bool Send(TActorId recipient, TEventArgs&& ... args) const {
return Send(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...));
}
- void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
+ void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
// register new actor in ActorSystem on new fresh mailbox.
- TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
+ TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
// Register new actor in ActorSystem on same _mailbox_ as current actor.
// There is one thread per mailbox to execute actor, which mean
@@ -454,7 +454,7 @@ namespace NActors {
return *static_cast<TActorContext*>(tls);
}
- inline TActorContext TActivationContext::ActorContextFor(TActorId id) {
+ inline TActorContext TActivationContext::ActorContextFor(TActorId id) {
auto& tls = *TlsActivationContext;
return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id);
}
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h
index a37887c939..25e6b3fbe2 100644
--- a/library/cpp/actors/core/actor_bootstrapped.h
+++ b/library/cpp/actors/core/actor_bootstrapped.h
@@ -9,7 +9,7 @@ namespace NActors {
template<typename TDerived>
class TActorBootstrapped : public TActor<TDerived> {
protected:
- TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override {
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override {
return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0);
}
@@ -19,11 +19,11 @@ namespace NActors {
TDerived& self = static_cast<TDerived&>(*this);
if constexpr (std::is_invocable_v<T, TDerived, const TActorContext&>) {
self.Bootstrap(ctx);
- } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&, const TActorContext&>) {
+ } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&, const TActorContext&>) {
self.Bootstrap(ev->Sender, ctx);
} else if constexpr (std::is_invocable_v<T, TDerived>) {
self.Bootstrap();
- } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&>) {
+ } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&>) {
self.Bootstrap(ev->Sender);
} else {
static_assert(dependent_false<TDerived>::value, "No correct Bootstrap() signature");
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index 6bcb768eaf..af7328d3a1 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -24,8 +24,8 @@ namespace NActors {
TActorContext *ActorContext = nullptr;
protected:
- TActorIdentity SelfActorId = TActorIdentity(TActorId());
- TActorId ParentActorId;
+ TActorIdentity SelfActorId = TActorIdentity(TActorId());
+ TActorId ParentActorId;
private:
template <typename TFirstEvent, typename... TOtherEvents>
@@ -107,12 +107,12 @@ namespace NActors {
TActorSystem *GetActorSystem() const { return GetActorContext().ExecutorThread.ActorSystem; }
TInstant Now() const { return GetActorContext().Now(); }
- bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
+ bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return GetActorContext().Send(recipient, ev, flags, cookie, std::move(traceId));
}
template <typename TEvent>
- bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
+ bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return GetActorContext().Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
@@ -130,7 +130,7 @@ namespace NActors {
return GetActorContext().Schedule(deadline, ev, cookie);
}
- TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) {
+ TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) {
return GetActorContext().Register(actor, mailboxType, poolId);
}
@@ -159,7 +159,7 @@ namespace NActors {
, Impl(std::move(impl))
{}
- TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override {
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override {
return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0);
}
diff --git a/library/cpp/actors/core/actor_coroutine_ut.cpp b/library/cpp/actors/core/actor_coroutine_ut.cpp
index 951512b877..13bfdcc2bf 100644
--- a/library/cpp/actors/core/actor_coroutine_ut.cpp
+++ b/library/cpp/actors/core/actor_coroutine_ut.cpp
@@ -29,7 +29,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) {
};
class TBasicResponderActor: public TActorBootstrapped<TBasicResponderActor> {
- TDeque<TActorId> RespondTo;
+ TDeque<TActorId> RespondTo;
public:
TBasicResponderActor() {
@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) {
}
void Run() override {
- TActorId child = GetActorContext().Register(new TBasicResponderActor);
+ TActorId child = GetActorContext().Register(new TBasicResponderActor);
ui32 itemsProcessed = 0;
try {
while (!Finish) {
diff --git a/library/cpp/actors/core/actorid.cpp b/library/cpp/actors/core/actorid.cpp
index ccda035eac..3330cd0ab3 100644
--- a/library/cpp/actors/core/actorid.cpp
+++ b/library/cpp/actors/core/actorid.cpp
@@ -3,18 +3,18 @@
#include <util/string/cast.h>
namespace NActors {
- void TActorId::Out(IOutputStream& o) const {
+ void TActorId::Out(IOutputStream& o) const {
o << "[" << NodeId() << ":" << LocalId() << ":" << Hint() << "]";
}
- TString TActorId::ToString() const {
+ TString TActorId::ToString() const {
TString x;
TStringOutput o(x);
Out(o);
return x;
}
- bool TActorId::Parse(const char* buf, ui32 sz) {
+ bool TActorId::Parse(const char* buf, ui32 sz) {
if (sz < 4 || buf[0] != '[' || buf[sz - 1] != ']')
return false;
diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h
index d972b1a0ff..c631ef3a72 100644
--- a/library/cpp/actors/core/actorid.h
+++ b/library/cpp/actors/core/actorid.h
@@ -11,7 +11,7 @@ namespace NActors {
// next 11 bits of node-id - pool id
// next 20 bits - node id itself
- struct TActorId {
+ struct TActorId {
static constexpr ui32 MaxServiceIDLength = 12;
static constexpr ui32 MaxPoolID = 0x000007FF;
static constexpr ui32 MaxNodeId = 0x000FFFFF;
@@ -37,19 +37,19 @@ namespace NActors {
} Raw;
public:
- TActorId() noexcept {
+ TActorId() noexcept {
Raw.X.X1 = 0;
Raw.X.X2 = 0;
}
- explicit TActorId(ui32 nodeId, ui32 poolId, ui64 localId, ui32 hint) noexcept {
+ explicit TActorId(ui32 nodeId, ui32 poolId, ui64 localId, ui32 hint) noexcept {
Y_VERIFY_DEBUG(poolId <= MaxPoolID);
Raw.N.LocalId = localId;
Raw.N.Hint = hint;
Raw.N.NodeId = nodeId | (poolId << PoolIndexShift);
}
- explicit TActorId(ui32 nodeId, const TStringBuf& x) noexcept {
+ explicit TActorId(ui32 nodeId, const TStringBuf& x) noexcept {
Y_VERIFY(x.size() <= MaxServiceIDLength, "service id is too long");
Raw.N.LocalId = 0;
Raw.N.Hint = 0;
@@ -57,7 +57,7 @@ namespace NActors {
memcpy(Raw.Buf, x.data(), x.size());
}
- explicit TActorId(ui64 x1, ui64 x2) noexcept {
+ explicit TActorId(ui64 x1, ui64 x2) noexcept {
Raw.X.X1 = x1;
Raw.X.X2 = x2;
}
@@ -103,7 +103,7 @@ namespace NActors {
return Raw.X.X2;
}
- bool operator<(const TActorId& x) const noexcept {
+ bool operator<(const TActorId& x) const noexcept {
const ui64 s1 = Raw.X.X1;
const ui64 s2 = Raw.X.X2;
const ui64 x1 = x.Raw.X.X1;
@@ -112,11 +112,11 @@ namespace NActors {
return (s1 != x1) ? (s1 < x1) : (s2 < x2);
}
- bool operator!=(const TActorId& x) const noexcept {
+ bool operator!=(const TActorId& x) const noexcept {
return Raw.X.X1 != x.Raw.X.X1 || Raw.X.X2 != x.Raw.X.X2;
}
- bool operator==(const TActorId& x) const noexcept {
+ bool operator==(const TActorId& x) const noexcept {
return !(x != *this);
}
@@ -153,19 +153,19 @@ namespace NActors {
}
struct THash {
- ui64 operator()(const TActorId& actorId) const noexcept {
- return actorId.Hash();
+ ui64 operator()(const TActorId& actorId) const noexcept {
+ return actorId.Hash();
}
};
struct THash32 {
- ui64 operator()(const TActorId& actorId) const noexcept {
- return actorId.Hash();
+ ui64 operator()(const TActorId& actorId) const noexcept {
+ return actorId.Hash();
}
};
struct TOrderedCmp {
- bool operator()(const TActorId &left, const TActorId &right) const noexcept {
+ bool operator()(const TActorId &left, const TActorId &right) const noexcept {
Y_VERIFY_DEBUG(!left.IsService() && !right.IsService(), "ordered compare works for plain actorids only");
const ui32 n1 = left.NodeId();
const ui32 n2 = right.NodeId();
@@ -179,18 +179,18 @@ namespace NActors {
bool Parse(const char* buf, ui32 sz);
};
- static_assert(sizeof(TActorId) == 16, "expect sizeof(TActorId) == 16");
+ static_assert(sizeof(TActorId) == 16, "expect sizeof(TActorId) == 16");
static_assert(MaxPools < TActorId::MaxPoolID); // current implementation of united pool has limit MaxPools on pool id
}
template <>
-inline void Out<NActors::TActorId>(IOutputStream& o, const NActors::TActorId& x) {
+inline void Out<NActors::TActorId>(IOutputStream& o, const NActors::TActorId& x) {
return x.Out(o);
}
template <>
-struct THash<NActors::TActorId> {
- inline ui64 operator()(const NActors::TActorId& x) const {
+struct THash<NActors::TActorId> {
+ inline ui64 operator()(const NActors::TActorId& x) const {
return x.Hash();
}
};
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index c58698a206..488abd2963 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -21,16 +21,16 @@ namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
struct TActorSystem::TServiceMap : TNonCopyable {
- NActors::TServiceMap<TActorId, TActorId, TActorId::THash> LocalMap;
+ NActors::TServiceMap<TActorId, TActorId, TActorId::THash> LocalMap;
TTicketLock Lock;
- TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
+ TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
TTicketLock::TGuard guard(&Lock);
- const TActorId old = LocalMap.Update(serviceId, actorId);
+ const TActorId old = LocalMap.Update(serviceId, actorId);
return old;
}
- TActorId LookupLocal(const TActorId& x) {
+ TActorId LookupLocal(const TActorId& x) {
return LocalMap.Find(x);
}
};
@@ -68,7 +68,7 @@ namespace NActors {
ev->Callstack.TraceIfEmpty();
#endif
- TActorId recipient = ev->GetRecipientRewrite();
+ TActorId recipient = ev->GetRecipientRewrite();
const ui32 recpNodeId = recipient.NodeId();
if (recpNodeId != NodeId && recpNodeId != 0) {
@@ -114,13 +114,13 @@ namespace NActors {
return false;
}
- bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const {
+ bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const {
return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags));
}
- void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
+ void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
Schedule(deadline - Timestamp(), ev, cookie);
- }
+ }
void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
const auto current = Monotonic();
@@ -131,22 +131,22 @@ namespace NActors {
ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
}
- void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
+ void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
const auto deadline = Monotonic() + delta;
-
- TTicketLock::TGuard guard(&ScheduleLock);
- ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
+
+ TTicketLock::TGuard guard(&ScheduleLock);
+ ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
}
- TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter,
- const TActorId& parentId) {
+ TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter,
+ const TActorId& parentId) {
Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
(ui32)executorPool, (ui32)ExecutorPoolCount);
return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
}
NThreading::TFuture<THolder<IEventBase>> TActorSystem::AskGeneric(TMaybe<ui32> expectedEventType,
- TActorId recipient, THolder<IEventBase> event,
+ TActorId recipient, THolder<IEventBase> event,
TDuration timeout) {
auto promise = NThreading::NewPromise<THolder<IEventBase>>();
Register(MakeAskActor(expectedEventType, recipient, std::move(event), timeout, promise).Release());
@@ -173,16 +173,16 @@ namespace NActors {
return ret;
}
- TActorId TActorSystem::InterconnectProxy(ui32 destinationNode) const {
+ TActorId TActorSystem::InterconnectProxy(ui32 destinationNode) const {
if (destinationNode < InterconnectCount)
return Interconnect[destinationNode];
else if (destinationNode != NodeId)
return MakeInterconnectProxyId(destinationNode);
else
- return TActorId();
+ return TActorId();
}
- ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) {
+ ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) {
// TODO: get rid of this method
for (ui32 i = 0; i < InterconnectCount; ++i) {
Send(eventFabric(Interconnect[i]));
@@ -194,9 +194,9 @@ namespace NActors {
return ServiceMap->LookupLocal(x);
}
- TActorId TActorSystem::RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
- // TODO: notify old actor about demotion
- return ServiceMap->RegisterLocalService(serviceId, actorId);
+ TActorId TActorSystem::RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
+ // TODO: notify old actor about demotion
+ return ServiceMap->RegisterLocalService(serviceId, actorId);
}
void TActorSystem::GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
@@ -217,7 +217,7 @@ namespace NActors {
// setup interconnect proxies
{
const TInterconnectSetup& setup = SystemSetup->Interconnect;
- Interconnect.Reset(new TActorId[InterconnectCount + 1]);
+ Interconnect.Reset(new TActorId[InterconnectCount + 1]);
for (ui32 i = 0, e = InterconnectCount; i != e; ++i) {
const TActorSetupCmd& x = setup.ProxyActors[i];
if (x.Actor) {
@@ -231,8 +231,8 @@ namespace NActors {
// setup local services
{
for (ui32 i = 0, e = (ui32)SystemSetup->LocalServices.size(); i != e; ++i) {
- const std::pair<TActorId, TActorSetupCmd>& x = SystemSetup->LocalServices[i];
- const TActorId xid = Register(x.second.Actor, x.second.MailboxType, x.second.PoolId, i);
+ const std::pair<TActorId, TActorSetupCmd>& x = SystemSetup->LocalServices[i];
+ const TActorId xid = Register(x.second.Actor, x.second.MailboxType, x.second.PoolId, i);
Y_VERIFY(!!xid);
if (!!x.first)
RegisterLocalService(x.first, xid);
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 40499d7586..c2c88ed2ec 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -66,17 +66,17 @@ namespace NActors {
virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0;
virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0;
- /**
- * Schedule one-shot event that will be send at given time point in the future.
- *
+ /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
* @param deadline the wallclock time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
* @param workerId index of thread which will perform event dispatching
- */
+ */
virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
-
- /**
+
+ /**
* Schedule one-shot event that will be send at given time point in the future.
*
* @param deadline the monotonic time point in future when event must be send
@@ -87,21 +87,21 @@ namespace NActors {
virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
/**
- * Schedule one-shot event that will be send after given delay.
- *
- * @param delta the time from now to delay event sending
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
+ * Schedule one-shot event that will be send after given delay.
+ *
+ * @param delta the time from now to delay event sending
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
* @param workerId index of thread which will perform event dispatching
- */
+ */
virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
-
+
// for actorsystem
virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0;
virtual void ScheduleActivation(ui32 activation) = 0;
virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0;
- virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0;
- virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0;
+ virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0;
+ virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0;
// lifecycle stuff
virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0;
@@ -223,7 +223,7 @@ namespace NActors {
THolder<TServiceMap> ServiceMap;
const ui32 InterconnectCount;
- TArrayHolder<TActorId> Interconnect;
+ TArrayHolder<TActorId> Interconnect;
volatile ui64 CurrentTimestamp;
volatile ui64 CurrentMonotonic;
@@ -235,7 +235,7 @@ namespace NActors {
friend class TExecutorThread;
THolder<TActorSystemSetup> SystemSetup;
- TActorId DefSelfID;
+ TActorId DefSelfID;
void* AppData0;
TIntrusivePtr<NLog::TSettings> LoggerSettings0;
TProxyWrapperFactory ProxyWrapperFactory;
@@ -255,22 +255,22 @@ namespace NActors {
void Stop();
void Cleanup();
- TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
- ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());
+ TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
+ ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());
bool Send(TAutoPtr<IEventHandle> ev) const;
- bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0) const;
+ bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0) const;
/**
- * Schedule one-shot event that will be send at given time point in the future.
- *
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
* @param deadline the wallclock time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
-
- /**
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
+
+ /**
* Schedule one-shot event that will be send at given time point in the future.
*
* @param deadline the monotonic time point in future when event must be send
@@ -280,15 +280,15 @@ namespace NActors {
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
/**
- * Schedule one-shot event that will be send after given delay.
- *
- * @param delta the time from now to delay event sending
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
-
- /**
+ * Schedule one-shot event that will be send after given delay.
+ *
+ * @param delta the time from now to delay event sending
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
+
+ /**
* A way to interact with actors from non-actor context.
*
* This method will send the `event` to the `recipient` and then will wait for a response. When response arrives,
@@ -303,7 +303,7 @@ namespace NActors {
*/
template <typename T>
[[nodiscard]]
- NThreading::TFuture<THolder<T>> Ask(TActorId recipient, THolder<IEventBase> event, TDuration timeout = TDuration::Max()) {
+ NThreading::TFuture<THolder<T>> Ask(TActorId recipient, THolder<IEventBase> event, TDuration timeout = TDuration::Max()) {
if constexpr (std::is_same_v<T, IEventBase>) {
return AskGeneric(Nothing(), recipient, std::move(event), timeout);
} else {
@@ -317,20 +317,20 @@ namespace NActors {
[[nodiscard]]
NThreading::TFuture<THolder<IEventBase>> AskGeneric(
TMaybe<ui32> expectedEventType,
- TActorId recipient,
+ TActorId recipient,
THolder<IEventBase> event,
TDuration timeout);
ui64 AllocateIDSpace(ui64 count);
- TActorId InterconnectProxy(ui32 destinationNode) const;
- ui32 BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>&);
+ TActorId InterconnectProxy(ui32 destinationNode) const;
+ ui32 BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>&);
void UpdateLinkStatus(ui8 status, ui32 destinationNode);
ui8 LinkStatus(ui32 destinationNode);
TActorId LookupLocalService(const TActorId& x) const;
- TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId);
+ TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId);
ui32 GetMaxActivityType() const {
return SystemSetup ? SystemSetup->MaxActivityType : 1;
diff --git a/library/cpp/actors/core/actorsystem_ut.cpp b/library/cpp/actors/core/actorsystem_ut.cpp
index 231d6f0ca1..2c93b12dcd 100644
--- a/library/cpp/actors/core/actorsystem_ut.cpp
+++ b/library/cpp/actors/core/actorsystem_ut.cpp
@@ -1,45 +1,45 @@
-#include "actorsystem.h"
-
-#include <library/cpp/actors/testlib/test_runtime.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NActors;
-
-Y_UNIT_TEST_SUITE(TActorSystemTest) {
-
- class TTestActor: public TActor<TTestActor> {
- public:
- TTestActor()
- : TActor{&TThis::Main}
- {
- }
-
- STATEFN(Main) {
- Y_UNUSED(ev);
- }
- };
-
- THolder<TTestActorRuntimeBase> CreateRuntime() {
- auto runtime = MakeHolder<TTestActorRuntimeBase>();
- runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; });
- runtime->Initialize();
- return runtime;
- }
-
- Y_UNIT_TEST(LocalService) {
- THolder<TTestActorRuntimeBase> runtime = CreateRuntime();
- auto actorA = runtime->Register(new TTestActor);
- auto actorB = runtime->Register(new TTestActor);
-
- TActorId myServiceId{0, TStringBuf{"my-service"}};
-
- auto prevActorId = runtime->RegisterService(myServiceId, actorA);
- UNIT_ASSERT(!prevActorId);
- UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorA);
-
- prevActorId = runtime->RegisterService(myServiceId, actorB);
- UNIT_ASSERT(prevActorId);
- UNIT_ASSERT_EQUAL(prevActorId, actorA);
- UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorB);
- }
-}
+#include "actorsystem.h"
+
+#include <library/cpp/actors/testlib/test_runtime.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NActors;
+
+Y_UNIT_TEST_SUITE(TActorSystemTest) {
+
+ class TTestActor: public TActor<TTestActor> {
+ public:
+ TTestActor()
+ : TActor{&TThis::Main}
+ {
+ }
+
+ STATEFN(Main) {
+ Y_UNUSED(ev);
+ }
+ };
+
+ THolder<TTestActorRuntimeBase> CreateRuntime() {
+ auto runtime = MakeHolder<TTestActorRuntimeBase>();
+ runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; });
+ runtime->Initialize();
+ return runtime;
+ }
+
+ Y_UNIT_TEST(LocalService) {
+ THolder<TTestActorRuntimeBase> runtime = CreateRuntime();
+ auto actorA = runtime->Register(new TTestActor);
+ auto actorB = runtime->Register(new TTestActor);
+
+ TActorId myServiceId{0, TStringBuf{"my-service"}};
+
+ auto prevActorId = runtime->RegisterService(myServiceId, actorA);
+ UNIT_ASSERT(!prevActorId);
+ UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorA);
+
+ prevActorId = runtime->RegisterService(myServiceId, actorB);
+ UNIT_ASSERT(prevActorId);
+ UNIT_ASSERT_EQUAL(prevActorId, actorA);
+ UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorB);
+ }
+}
diff --git a/library/cpp/actors/core/ask.cpp b/library/cpp/actors/core/ask.cpp
index 0054c9a906..4e4d6a1a2b 100644
--- a/library/cpp/actors/core/ask.cpp
+++ b/library/cpp/actors/core/ask.cpp
@@ -19,7 +19,7 @@ namespace NActors {
public:
TAskActor(
TMaybe<ui32> expectedEventType,
- TActorId recipient,
+ TActorId recipient,
THolder<IEventBase> event,
TDuration timeout,
const NThreading::TPromise<THolder<IEventBase>>& promise)
@@ -55,7 +55,7 @@ namespace NActors {
public:
TMaybe<ui32> ExpectedEventType_;
- TActorId Recipient_;
+ TActorId Recipient_;
THolder<IEventBase> Event_;
TDuration Timeout_;
NThreading::TPromise<THolder<IEventBase>> Promise_;
@@ -64,7 +64,7 @@ namespace NActors {
THolder<IActor> MakeAskActor(
TMaybe<ui32> expectedEventType,
- TActorId recipient,
+ TActorId recipient,
THolder<IEventBase> event,
TDuration timeout,
const NThreading::TPromise<THolder<IEventBase>>& promise)
diff --git a/library/cpp/actors/core/ask.h b/library/cpp/actors/core/ask.h
index 036f1833a4..b935fac564 100644
--- a/library/cpp/actors/core/ask.h
+++ b/library/cpp/actors/core/ask.h
@@ -11,7 +11,7 @@ namespace NActors {
*/
THolder<IActor> MakeAskActor(
TMaybe<ui32> expectedEventType,
- TActorId recipient,
+ TActorId recipient,
THolder<IEventBase> event,
TDuration timeout,
const NThreading::TPromise<THolder<IEventBase>>& promise);
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 6ff02aaf94..9519978bc1 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -48,9 +48,9 @@ namespace NActors {
// fat handle
class IEventHandle : TNonCopyable {
struct TOnNondelivery {
- TActorId Recipient;
+ TActorId Recipient;
- TOnNondelivery(const TActorId& recipient)
+ TOnNondelivery(const TActorId& recipient)
: Recipient(recipient)
{
}
@@ -99,8 +99,8 @@ namespace NActors {
const ui32 Type;
const ui32 Flags;
- const TActorId Recipient;
- const TActorId Sender;
+ const TActorId Recipient;
+ const TActorId Sender;
const ui64 Cookie;
const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect
@@ -108,7 +108,7 @@ namespace NActors {
NWilson::TTraceId TraceId;
// filled if feeded by interconnect session
- const TActorId InterconnectSession;
+ const TActorId InterconnectSession;
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
::NHPTimer::STime SendTime;
@@ -138,13 +138,13 @@ namespace NActors {
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
- TActorId RewriteRecipient;
+ TActorId RewriteRecipient;
ui32 RewriteType;
THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
public:
- void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
+ void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
RewriteRecipient = recipientRewrite;
RewriteType = typeRewrite;
}
@@ -154,7 +154,7 @@ namespace NActors {
RewriteType = Type;
}
- const TActorId& GetRecipientRewrite() const {
+ const TActorId& GetRecipientRewrite() const {
return RewriteRecipient;
}
@@ -162,12 +162,12 @@ namespace NActors {
return RewriteType;
}
- TActorId GetForwardOnNondeliveryRecipient() const {
- return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
+ TActorId GetForwardOnNondeliveryRecipient() const {
+ return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
}
- IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
- const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
+ IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
+ const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
: Type(ev->Type())
, Flags(flags)
, Recipient(recipient)
@@ -187,11 +187,11 @@ namespace NActors {
IEventHandle(ui32 type,
ui32 flags,
- const TActorId& recipient,
- const TActorId& sender,
+ const TActorId& recipient,
+ const TActorId& sender,
TIntrusivePtr<TEventSerializedData> buffer,
ui64 cookie,
- const TActorId* forwardOnNondelivery = nullptr,
+ const TActorId* forwardOnNondelivery = nullptr,
NWilson::TTraceId traceId = {})
: Type(type)
, Flags(flags)
@@ -211,11 +211,11 @@ namespace NActors {
}
// Special ctor for events from interconnect.
- IEventHandle(const TActorId& session,
+ IEventHandle(const TActorId& session,
ui32 type,
ui32 flags,
- const TActorId& recipient,
- const TActorId& sender,
+ const TActorId& recipient,
+ const TActorId& sender,
TIntrusivePtr<TEventSerializedData> buffer,
ui64 cookie,
TScopeId originScopeId,
@@ -276,7 +276,7 @@ namespace NActors {
return x;
}
- TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
+ TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
if (Event)
return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
else
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index d7546b901a..baaf333ca9 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -488,11 +488,11 @@ namespace NActors {
}
};
- inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) {
- return TActorId(actorId.GetRawX1(), actorId.GetRawX2());
+ inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) {
+ return TActorId(actorId.GetRawX1(), actorId.GetRawX2());
}
- inline void ActorIdToProto(const TActorId& src, NActorsProto::TActorId* dest) {
+ inline void ActorIdToProto(const TActorId& src, NActorsProto::TActorId* dest) {
Y_VERIFY_DEBUG(dest);
dest->SetRawX1(src.RawX1());
dest->SetRawX2(src.RawX2());
diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h
index 702cf50fad..b5b9d7c9fa 100644
--- a/library/cpp/actors/core/events.h
+++ b/library/cpp/actors/core/events.h
@@ -20,7 +20,7 @@ namespace NActors {
ES_INTERCONNECT_TCP = 8,
ES_PROFILER = 9,
ES_YF = 10,
- ES_HTTP = 11,
+ ES_HTTP = 11,
ES_USERSPACE = 4096,
@@ -99,7 +99,7 @@ namespace NActors {
InvokeQuery,
End,
- // Compatibility section
+ // Compatibility section
PoisonPill = Poison,
ActorDied = Gone,
};
@@ -191,17 +191,17 @@ namespace NActors {
struct TEvCallbackException: public TEventPB<TEvCallbackException,
NActorsProto::TCallbackException,
TSystem::CallbackException> {
- TEvCallbackException(const TActorId& id, const TString& msg) {
- ActorIdToProto(id, Record.MutableActorId());
+ TEvCallbackException(const TActorId& id, const TString& msg) {
+ ActorIdToProto(id, Record.MutableActorId());
Record.SetExceptionMessage(msg);
}
};
struct TEvCallbackCompletion: public TEventPB<TEvCallbackCompletion,
- NActorsProto::TActorId,
+ NActorsProto::TActorId,
TSystem::CallbackCompletion> {
- TEvCallbackCompletion(const TActorId& id) {
- ActorIdToProto(id, &Record);
+ TEvCallbackCompletion(const TActorId& id) {
+ ActorIdToProto(id, &Record);
}
};
diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp
index 23deaffd10..2a5a0b1cc6 100644
--- a/library/cpp/actors/core/events_undelivered.cpp
+++ b/library/cpp/actors/core/events_undelivered.cpp
@@ -41,7 +41,7 @@ namespace NActors {
TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(ui32 reason, bool unsure) {
if (Flags & FlagForwardOnNondelivery) {
const ui32 updatedFlags = Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession);
- const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId();
+ const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId();
if (Event)
return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, TraceId.Clone());
diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h
index e7a13c1005..ec55c43b40 100644
--- a/library/cpp/actors/core/executelater.h
+++ b/library/cpp/actors/core/executelater.h
@@ -17,8 +17,8 @@ namespace NActors {
IActor::EActivityType activityType,
ui32 channel = 0,
ui64 cookie = 0,
- const TActorId& reportCompletionTo = TActorId(),
- const TActorId& reportExceptionTo = TActorId()) noexcept
+ const TActorId& reportCompletionTo = TActorId(),
+ const TActorId& reportExceptionTo = TActorId()) noexcept
: Callback(std::move(callback))
, Channel(channel)
, Cookie(cookie)
@@ -65,8 +65,8 @@ namespace NActors {
TCallback Callback;
const ui32 Channel;
const ui64 Cookie;
- const TActorId ReportCompletionTo;
- const TActorId ReportExceptionTo;
+ const TActorId ReportCompletionTo;
+ const TActorId ReportExceptionTo;
};
template <typename T>
@@ -75,8 +75,8 @@ namespace NActors {
IActor::EActivityType activityType,
ui32 channel = 0,
ui64 cookie = 0,
- const TActorId& reportCompletionTo = TActorId(),
- const TActorId& reportExceptionTo = TActorId()) noexcept {
+ const TActorId& reportCompletionTo = TActorId(),
+ const TActorId& reportExceptionTo = TActorId()) noexcept {
return new TExecuteLater<T>(std::forward<T>(func),
activityType,
channel,
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index c3b9999168..860496f108 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -7,7 +7,7 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) {
+ void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) {
actor->SelfActorId = self;
actor->Registered(sys, owner);
}
@@ -97,7 +97,7 @@ namespace NActors {
mailbox->AttachActor(localActorId, actor);
// do init
- const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
+ const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
DoActorInit(ActorSystem, actor, actorId, parentId);
// Once we unlock the mailbox the actor starts running and we cannot use the pointer any more
@@ -144,7 +144,7 @@ namespace NActors {
const ui64 localActorId = AllocateID();
mailbox->AttachActor(localActorId, actor);
- const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
+ const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
DoActorInit(ActorSystem, actor, actorId, parentId);
NHPTimer::STime elapsed = GetCycleCountFast() - hpstart;
if (elapsed > 1000000) {
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index c84ce1af77..d52a242fc6 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -24,8 +24,8 @@ namespace NActors {
~TExecutorPoolBaseMailboxed();
void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingWriteCounter) override;
bool Send(TAutoPtr<IEventHandle>& ev) override;
- TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override;
- TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override;
+ TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override;
+ TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override;
bool Cleanup() override;
};
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 4dce16939a..936d2e94a7 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -315,11 +315,11 @@ namespace NActors {
void TBasicExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
Y_VERIFY_DEBUG(workerId < PoolThreads);
-
+
const auto deadline = ActorSystem->Monotonic() + delta;
ScheduleWriters[workerId].Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
-
+ }
+
void TBasicExecutorPool::SetRealTimeMode() const {
// TODO: musl-libc version of `sched_param` struct is for some reason different from pthread
// version in Ubuntu 12.04
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 023190f7fe..dd83c85c74 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -6,7 +6,7 @@
#include "executor_pool_base.h"
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/actors/util/threadparkpad.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <util/system/mutex.h>
@@ -87,7 +87,7 @@ namespace NActors {
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
-
+
void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override;
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 76dff693af..8c170c2d84 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -25,7 +25,7 @@ private:
private:
TAtomic Counter;
- TActorId Receiver;
+ TActorId Receiver;
std::function<void(void)> Action;
@@ -36,7 +36,7 @@ public:
, Action(action)
{}
- void Start(TActorId receiver, size_t count)
+ void Start(TActorId receiver, size_t count)
{
AtomicSet(Counter, count);
Receiver = receiver;
@@ -102,7 +102,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
executorPool->SetThreadCount(halfSize);
TTestSenderActor* actors[size];
- TActorId actorIds[size];
+ TActorId actorIds[size];
for (size_t i = 0; i < size; ++i) {
actors[i] = new TTestSenderActor();
actorIds[i] = actorSystem.Register(actors[i]);
@@ -176,7 +176,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
executorPool->SetThreadCount(halfSize);
TTestSenderActor* actors[size];
- TActorId actorIds[size];
+ TActorId actorIds[size];
for (size_t i = 0; i < size; ++i) {
actors[i] = new TTestSenderActor();
actorIds[i] = actorSystem.Register(actors[i]);
@@ -201,7 +201,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
counter = 0;
}
});
- TActorId changerActorId = actorSystem.Register(changerActor);
+ TActorId changerActorId = actorSystem.Register(changerActor);
changerActor->Start(changerActorId, msgCount);
actorSystem.Send(changerActorId, new TEvMsg());
@@ -260,7 +260,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
auto begin = TInstant::Now();
TTestSenderActor* actors[size];
- TActorId actorIds[size];
+ TActorId actorIds[size];
for (size_t i = 0; i < size; ++i) {
actors[i] = new TTestSenderActor();
@@ -304,7 +304,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
auto begin = TInstant::Now();
TTestSenderActor* actors[actorsCount];
- TActorId actorIds[actorsCount];
+ TActorId actorIds[actorsCount];
for (size_t i = 0; i < actorsCount; ++i) {
actors[i] = new TTestSenderActor();
@@ -348,7 +348,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
auto begin = TInstant::Now();
TTestSenderActor* actors[actorsCount];
- TActorId actorIds[actorsCount];
+ TActorId actorIds[actorsCount];
for (size_t i = 0; i < actorsCount; ++i) {
actors[i] = new TTestSenderActor();
diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp
index fb557ae6b0..025b5a22c2 100644
--- a/library/cpp/actors/core/executor_pool_io.cpp
+++ b/library/cpp/actors/core/executor_pool_io.cpp
@@ -81,11 +81,11 @@ namespace NActors {
void TIOExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
Y_UNUSED(workerId);
const auto deadline = ActorSystem->Monotonic() + delta;
-
- TTicketLock::TGuard guard(&ScheduleLock);
- ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
-
+
+ TTicketLock::TGuard guard(&ScheduleLock);
+ ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
+ }
+
void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) {
Activations.Push(activation, revolvingWriteCounter);
const TAtomic x = AtomicIncrement(Semaphore);
diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h
index e576d642a1..a1359ba4ab 100644
--- a/library/cpp/actors/core/executor_pool_io.h
+++ b/library/cpp/actors/core/executor_pool_io.h
@@ -35,7 +35,7 @@ namespace NActors {
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
-
+
void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override;
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index dac6245635..da4934eccd 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -1255,13 +1255,13 @@ namespace NActors {
inline bool TUnitedWorkers::NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) {
return Pools[pool].NextExecution(activation, revolvingCounter);
}
-
+
inline void TUnitedWorkers::StopExecution(TPoolId pool) {
if (Pools[pool].StopExecution()) { // pending token
TryWake(pool);
}
- }
-
+ }
+
inline void TUnitedWorkers::Balance() {
ui64 ts = GetCycleCountFast();
if (Balancer->TryLock(ts)) {
diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h
index a090ba2466..b1af850312 100644
--- a/library/cpp/actors/core/executor_pool_united.h
+++ b/library/cpp/actors/core/executor_pool_united.h
@@ -7,7 +7,7 @@
#include <library/cpp/actors/util/unordered_cache.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -79,7 +79,7 @@ namespace NActors {
// Stop currently active execution and start new one if token is available
// NOTE: Reuses token if it's not destroyed
bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
-
+
// Stop active execution
void StopExecution(TPoolId pool);
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index d4df17f1b8..be92b9352a 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -37,7 +37,7 @@ private:
private:
TAtomic Counter;
- TActorId Receiver;
+ TActorId Receiver;
std::function<void(void)> Action;
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 446b651efd..03ef88ea51 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -50,14 +50,14 @@ namespace NActors {
&Ctx.WorkerStats);
}
- TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, const TActorId& parentId) {
+ TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, const TActorId& parentId) {
if (poolId == Max<ui32>())
return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient);
else
return ActorSystem->Register(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient);
}
- TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) {
+ TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) {
return Ctx.Executor->Register(actor, mailbox, hint, parentId ? parentId : CurrentRecipient);
}
@@ -71,7 +71,7 @@ namespace NActors {
DyingActors.clear(); // here is actual destruction of actors
}
- void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId);
}
@@ -81,11 +81,11 @@ namespace NActors {
Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId);
}
- void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
- ++CurrentActorScheduledEventsCounter;
+ void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ ++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(delta, ev, cookie, Ctx.WorkerId);
- }
-
+ }
+
template <class T>
inline TString SafeTypeName(T* t) {
if (t == nullptr) {
@@ -102,7 +102,7 @@ namespace NActors {
return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)");
}
- inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient,
+ inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient,
double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) {
const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr;
LWPROBE(EventSlowDelivery,
@@ -116,7 +116,7 @@ namespace NActors {
}
inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const IActor* actor, ui32 poolId, ui32 activityType,
- const TActorId& currentRecipient, double eventMs) {
+ const TActorId& currentRecipient, double eventMs) {
// Event could have been destroyed by actor->Receive();
const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr;
LWPROBE(SlowEvent,
@@ -198,7 +198,7 @@ namespace NActors {
if (actor)
actor->AddElapsedTicks(elapsed);
- CurrentRecipient = TActorId();
+ CurrentRecipient = TActorId();
} else {
TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
if (nonDelivered.Get()) {
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 9d3c573f0d..f3f1d527d6 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -39,17 +39,17 @@ namespace NActors {
: TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox)
{}
- TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(),
- const TActorId& parentId = TActorId());
- TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId());
+ TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(),
+ const TActorId& parentId = TActorId());
+ TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId());
void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId);
void DropUnregistered();
const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; }
- void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+ void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
- void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
-
+ void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+
bool Send(TAutoPtr<IEventHandle> ev) {
#ifdef USE_ACTOR_CALLSTACK
ev->Callstack = TCallstack::GetTlsCallstack();
@@ -81,7 +81,7 @@ namespace NActors {
// Event-specific (currently executing)
TVector<THolder<IActor>> DyingActors;
- TActorId CurrentRecipient;
+ TActorId CurrentRecipient;
ui64 CurrentActorScheduledEventsCounter = 0;
// Thread-specific
diff --git a/library/cpp/actors/core/invoke.h b/library/cpp/actors/core/invoke.h
index 931a9767dd..26de350a95 100644
--- a/library/cpp/actors/core/invoke.h
+++ b/library/cpp/actors/core/invoke.h
@@ -92,7 +92,7 @@ namespace NActors {
, Complete(std::move(complete))
{}
- void Bootstrap(const TActorId& parentId, const TActorContext& ctx) {
+ void Bootstrap(const TActorId& parentId, const TActorContext& ctx) {
auto process = [complete = std::move(Complete)](TEvents::TEvInvokeResult& res, const TActorContext& ctx) {
complete([&] { return res.GetResult<TCallback>(); }, ctx);
};
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 5f63b5af58..88e24f4c01 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -1,7 +1,7 @@
#include "log.h"
#include "log_settings.h"
-#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/monlib/service/pages/templates.h>
static_assert(int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG), "expect int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG)");
static_assert(int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT), "expect int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT)");
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index c11a7cf3c1..c7b6e85bef 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -14,7 +14,7 @@
#include <util/string/printf.h>
#include <util/string/builder.h>
#include <library/cpp/logger/all.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <library/cpp/json/writer/json.h>
#include <library/cpp/svnversion/svnversion.h>
@@ -323,7 +323,7 @@ namespace NActors {
{
const NLog::TSettings *mSettings = ctx.LoggerSettings();
TLoggerActor::Throttle(*mSettings);
- ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
+ ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
}
template <typename TCtx, typename... TArgs>
diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp
index f52f2fc5d2..de2a3a9a68 100644
--- a/library/cpp/actors/core/log_settings.cpp
+++ b/library/cpp/actors/core/log_settings.cpp
@@ -4,7 +4,7 @@
namespace NActors {
namespace NLog {
- TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
+ TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EComponent minVal, EComponent maxVal, EComponentToStringFunc func,
EPriority defPriority, EPriority defSamplingPriority,
ui32 defSamplingRate, ui64 timeThresholdMs)
@@ -27,7 +27,7 @@ namespace NActors {
Append(minVal, maxVal, func);
}
- TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
+ TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EPriority defPriority, EPriority defSamplingPriority,
ui32 defSamplingRate, ui64 timeThresholdMs)
: LoggerActorId(loggerActorId)
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index 7fe4504edd..564d2db73e 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -69,7 +69,7 @@ namespace NActors {
struct TSettings: public TThrRefBase {
public:
- TActorId LoggerActorId;
+ TActorId LoggerActorId;
EComponent LoggerComponent;
ui64 TimeThresholdMs;
bool AllowDrop;
@@ -98,12 +98,12 @@ namespace NActors {
// protobuf enumeration of components. In this case protoc
// automatically generates YOURTYPE_MIN, YOURTYPE_MAX and
// YOURTYPE_Name for you.
- TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
+ TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EComponent minVal, EComponent maxVal, EComponentToStringFunc func,
EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG,
ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000);
- TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
+ TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG,
ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000);
diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp
index 09b5f88ea2..2a65e270de 100644
--- a/library/cpp/actors/core/log_ut.cpp
+++ b/library/cpp/actors/core/log_ut.cpp
@@ -15,7 +15,7 @@ namespace {
}
TIntrusivePtr<TSettings> DefaultSettings() {
- auto loggerId = TActorId{0, "Logger"};
+ auto loggerId = TActorId{0, "Logger"};
auto s = MakeIntrusive<TSettings>(loggerId, 0, EPriority::PRI_TRACE);
s->SetAllowDrop(false);
s->Append(0, 1, ServiceToString);
@@ -98,7 +98,7 @@ namespace {
TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()};
std::shared_ptr<TMockBackend> LogBackend;
- TActorId LoggerActor;
+ TActorId LoggerActor;
TTestActorRuntimeBase Runtime;
};
}
diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp
index d84b4f9e46..40fcebaa72 100644
--- a/library/cpp/actors/core/mailbox.cpp
+++ b/library/cpp/actors/core/mailbox.cpp
@@ -163,7 +163,7 @@ namespace NActors {
}
bool TMailboxTable::SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) {
- const TActorId& recipient = ev->GetRecipientRewrite();
+ const TActorId& recipient = ev->GetRecipientRewrite();
const ui32 hint = recipient.Hint();
// copy-paste from Get to avoid duplicated type-switches
diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h
index 0bd9c4d314..38a03af42d 100644
--- a/library/cpp/actors/core/mailbox.h
+++ b/library/cpp/actors/core/mailbox.h
@@ -305,14 +305,14 @@ namespace NActors {
static const ui32 LineIndexShift = 12;
static const ui32 LineIndexMask = 0x1FFFFu << LineIndexShift;
static const ui32 LineHintMask = 0xFFFu;
- static const ui32 PoolIndexShift = TActorId::PoolIndexShift;
- static const ui32 PoolIndexMask = TActorId::PoolIndexMask;
+ static const ui32 PoolIndexShift = TActorId::PoolIndexShift;
+ static const ui32 PoolIndexMask = TActorId::PoolIndexMask;
static ui32 LineIndex(ui32 hint) {
return ((hint & LineIndexMask) >> LineIndexShift);
}
static ui32 PoolIndex(ui32 hint) {
- return TActorId::PoolIndex(hint);
+ return TActorId::PoolIndex(hint);
}
TMailboxHeader* Get(ui32 hint);
diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h
index c450f2338e..45e0e7ff65 100644
--- a/library/cpp/actors/core/mon.h
+++ b/library/cpp/actors/core/mon.h
@@ -2,8 +2,8 @@
#include "events.h"
#include "event_local.h"
-#include <library/cpp/monlib/service/monservice.h>
-#include <library/cpp/monlib/service/pages/mon_page.h>
+#include <library/cpp/monlib/service/monservice.h>
+#include <library/cpp/monlib/service/pages/mon_page.h>
namespace NActors {
namespace NMon {
diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp
index 0e1dbd0031..61bf7452a7 100644
--- a/library/cpp/actors/core/process_stats.cpp
+++ b/library/cpp/actors/core/process_stats.cpp
@@ -3,8 +3,8 @@
#include "hfunc.h"
#include "process_stats.h"
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/datetime/uptime.h>
#include <util/system/defaults.h>
@@ -197,7 +197,7 @@ namespace {
MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true);
MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true);
UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false);
- NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false);
+ NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false);
SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false);
}
@@ -213,7 +213,7 @@ namespace {
*MinorPageFaults = procStat.MinFlt;
*MajorPageFaults = procStat.MajFlt;
*UptimeSeconds = procStat.Uptime.Seconds();
- *NumThreads = procStat.NumThreads;
+ *NumThreads = procStat.NumThreads;
*SystemUptimeSeconds = procStat.Uptime.Seconds();
}
@@ -228,7 +228,7 @@ namespace {
NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults;
NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults;
NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds;
- NMonitoring::TDynamicCounters::TCounterPtr NumThreads;
+ NMonitoring::TDynamicCounters::TCounterPtr NumThreads;
NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds;
};
@@ -236,7 +236,7 @@ namespace {
class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> {
using TBase = TProcStatCollectingActor<TRegistryCollector>;
public:
- TRegistryCollector(TDuration interval, NMonitoring::TMetricRegistry& registry)
+ TRegistryCollector(TDuration interval, NMonitoring::TMetricRegistry& registry)
: TBase{interval}
{
VmSize = registry.IntGauge({{"sensor", "process.VmSize"}});
@@ -244,13 +244,13 @@ namespace {
FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}});
CGroupMemLimit = registry.IntGauge({{"sensor", "process.CGroupMemLimit"}});
UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}});
- NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}});
+ NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}});
SystemUptimeSeconds = registry.IntGauge({{"sensor", "system.UptimeSeconds"}});
-
- UserTime = registry.Rate({{"sensor", "process.UserTime"}});
- SysTime = registry.Rate({{"sensor", "process.SystemTime"}});
- MinorPageFaults = registry.Rate({{"sensor", "process.MinorPageFaults"}});
- MajorPageFaults = registry.Rate({{"sensor", "process.MajorPageFaults"}});
+
+ UserTime = registry.Rate({{"sensor", "process.UserTime"}});
+ SysTime = registry.Rate({{"sensor", "process.SystemTime"}});
+ MinorPageFaults = registry.Rate({{"sensor", "process.MinorPageFaults"}});
+ MajorPageFaults = registry.Rate({{"sensor", "process.MajorPageFaults"}});
}
void UpdateCounters(const TProcStat& procStat) {
@@ -259,23 +259,23 @@ namespace {
FileRssSize->Set(procStat.FileRss);
CGroupMemLimit->Set(procStat.CGroupMemLim);
UptimeSeconds->Set(procStat.Uptime.Seconds());
- NumThreads->Set(procStat.NumThreads);
+ NumThreads->Set(procStat.NumThreads);
SystemUptimeSeconds->Set(procStat.SystemUptime.Seconds());
-
- // it is ok here to reset and add metric value, because mutation
- // is performed in siglethreaded context
-
- UserTime->Reset();
- UserTime->Add(procStat.Utime);
-
- SysTime->Reset();
- SysTime->Add(procStat.Stime);
-
- MinorPageFaults->Reset();
- MinorPageFaults->Add(procStat.MinFlt);
-
- MajorPageFaults->Reset();
- MajorPageFaults->Add(procStat.MajFlt);
+
+ // it is ok here to reset and add metric value, because mutation
+ // is performed in siglethreaded context
+
+ UserTime->Reset();
+ UserTime->Add(procStat.Utime);
+
+ SysTime->Reset();
+ SysTime->Add(procStat.Stime);
+
+ MinorPageFaults->Reset();
+ MinorPageFaults->Add(procStat.MinFlt);
+
+ MajorPageFaults->Reset();
+ MajorPageFaults->Add(procStat.MajFlt);
}
private:
@@ -283,12 +283,12 @@ namespace {
NMonitoring::TIntGauge* AnonRssSize;
NMonitoring::TIntGauge* FileRssSize;
NMonitoring::TIntGauge* CGroupMemLimit;
- NMonitoring::TRate* UserTime;
- NMonitoring::TRate* SysTime;
- NMonitoring::TRate* MinorPageFaults;
- NMonitoring::TRate* MajorPageFaults;
+ NMonitoring::TRate* UserTime;
+ NMonitoring::TRate* SysTime;
+ NMonitoring::TRate* MinorPageFaults;
+ NMonitoring::TRate* MajorPageFaults;
NMonitoring::TIntGauge* UptimeSeconds;
- NMonitoring::TIntGauge* NumThreads;
+ NMonitoring::TIntGauge* NumThreads;
NMonitoring::TIntGauge* SystemUptimeSeconds;
};
} // namespace
@@ -297,7 +297,7 @@ namespace {
return new TDynamicCounterCollector(intervalSec, counters);
}
- IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) {
+ IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) {
return new TRegistryCollector(interval, registry);
}
}
diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h
index 66346d0b5a..4e6bb31090 100644
--- a/library/cpp/actors/core/process_stats.h
+++ b/library/cpp/actors/core/process_stats.h
@@ -3,10 +3,10 @@
#include "defs.h"
#include "actor.h"
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NMonitoring {
- class TMetricRegistry;
+ class TMetricRegistry;
}
namespace NActors {
@@ -62,5 +62,5 @@ namespace NActors {
};
IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters);
- IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry);
+ IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry);
}
diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp
index febc5e40dd..d1e6501fd1 100644
--- a/library/cpp/actors/core/scheduler_actor.cpp
+++ b/library/cpp/actors/core/scheduler_actor.cpp
@@ -39,7 +39,7 @@ namespace NActors {
TVector<NSchedulerQueue::TReader*> Readers;
- TActorId PollerActor;
+ TActorId PollerActor;
TPollerToken::TPtr PollerToken;
ui64 RealTime;
@@ -68,7 +68,7 @@ namespace NActors {
: TActor(&TSchedulerActor::StateFunc)
, Cfg(cfg)
, TimerDescriptor(new TTimerDescriptor())
- , PollerActor(MakePollerActorId())
+ , PollerActor(MakePollerActorId())
{
Y_ASSERT(Cfg.ResolutionMicroseconds != 0);
Y_ASSERT(Cfg.ProgressThreshold != 0);
diff --git a/library/cpp/actors/core/scheduler_actor.h b/library/cpp/actors/core/scheduler_actor.h
index c2c561b43d..600f8d98ff 100644
--- a/library/cpp/actors/core/scheduler_actor.h
+++ b/library/cpp/actors/core/scheduler_actor.h
@@ -21,9 +21,9 @@ namespace NActors {
IActor* CreateSchedulerActor(const TSchedulerConfig& cfg);
- inline TActorId MakeSchedulerActorId() {
+ inline TActorId MakeSchedulerActorId() {
char x[12] = {'s', 'c', 'h', 'e', 'd', 'u', 'l', 'e', 'r', 's', 'e', 'r'};
- return TActorId(0, TStringBuf(x, 12));
+ return TActorId(0, TStringBuf(x, 12));
}
}
diff --git a/library/cpp/actors/core/scheduler_actor_ut.cpp b/library/cpp/actors/core/scheduler_actor_ut.cpp
index 09b7369d36..dae14cbe67 100644
--- a/library/cpp/actors/core/scheduler_actor_ut.cpp
+++ b/library/cpp/actors/core/scheduler_actor_ut.cpp
@@ -60,14 +60,14 @@ Y_UNIT_TEST_SUITE(SchedulerActor) {
setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic");
}
// create poller actor (whether platform supports it)
- TActorId pollerActorId;
+ TActorId pollerActorId;
if (IActor* poller = CreatePollerActor()) {
- pollerActorId = MakePollerActorId();
+ pollerActorId = MakePollerActorId();
setup->LocalServices.emplace_back(pollerActorId, TActorSetupCmd(poller, TMailboxType::ReadAsFilled, 0));
}
- TActorId schedulerActorId;
+ TActorId schedulerActorId;
if (IActor* schedulerActor = CreateSchedulerActor(TSchedulerConfig())) {
- schedulerActorId = MakeSchedulerActorId();
+ schedulerActorId = MakeSchedulerActorId();
setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0));
}
setup->Scheduler = CreateSchedulerThread(TSchedulerConfig());
diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make
index 3ee28d5850..11b2ea3eb7 100644
--- a/library/cpp/actors/core/ut/ya.make
+++ b/library/cpp/actors/core/ut/ya.make
@@ -29,18 +29,18 @@ PEERDIR(
)
SRCS(
- actor_coroutine_ut.cpp
- actor_ut.cpp
- actorsystem_ut.cpp
+ actor_coroutine_ut.cpp
+ actor_ut.cpp
+ actorsystem_ut.cpp
ask_ut.cpp
balancer_ut.cpp
- event_pb_payload_ut.cpp
+ event_pb_payload_ut.cpp
event_pb_ut.cpp
executor_pool_basic_ut.cpp
executor_pool_united_ut.cpp
log_ut.cpp
memory_tracker_ut.cpp
- scheduler_actor_ut.cpp
+ scheduler_actor_ut.cpp
)
END()
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 880a9d00db..40f27456c8 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -111,7 +111,7 @@ PEERDIR(
library/cpp/json/writer
library/cpp/logger
library/cpp/lwtrace
- library/cpp/monlib/dynamic_counters
+ library/cpp/monlib/dynamic_counters
library/cpp/svnversion
library/cpp/threading/future
)
diff --git a/library/cpp/actors/helpers/activeactors.h b/library/cpp/actors/helpers/activeactors.h
index 0fdb0fab10..b0e4f5cc99 100644
--- a/library/cpp/actors/helpers/activeactors.h
+++ b/library/cpp/actors/helpers/activeactors.h
@@ -10,9 +10,9 @@ namespace NActors {
// TActiveActors
// This class helps manage created actors and kill them all on PoisonPill.
////////////////////////////////////////////////////////////////////////////
- class TActiveActors : public THashSet<TActorId> {
+ class TActiveActors : public THashSet<TActorId> {
public:
- void Insert(const TActorId &aid) {
+ void Insert(const TActorId &aid) {
bool inserted = insert(aid).second;
Y_VERIFY(inserted);
}
@@ -23,7 +23,7 @@ namespace NActors {
}
}
- void Erase(const TActorId &aid) {
+ void Erase(const TActorId &aid) {
auto num = erase(aid);
Y_VERIFY(num == 1);
}
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp
index d75cc54023..61610ec3d3 100644
--- a/library/cpp/actors/helpers/flow_controlled_queue.cpp
+++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp
@@ -18,12 +18,12 @@ class TFlowControlledRequestActor : public IActor {
void HandleReply(TAutoPtr<IEventHandle> &ev);
void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev);
public:
- const TActorId Source;
+ const TActorId Source;
const ui64 Cookie;
const ui32 Flags;
const ui64 StartCounter;
- TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags)
+ TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags)
: IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity)
, QueueActor(queue)
, Source(source)
@@ -49,7 +49,7 @@ public:
};
class TFlowControlledRequestQueue : public IActor {
- const TActorId Target;
+ const TActorId Target;
const TFlowControlledQueueConfig Config;
TDeque<THolder<IEventHandle>> UnhandledRequests;
@@ -123,7 +123,7 @@ class TFlowControlledRequestQueue : public IActor {
if (reqActor) {
if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
TActivationContext::Send(
- new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
+ new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
);
}
reqActor->PassAway();
@@ -153,7 +153,7 @@ class TFlowControlledRequestQueue : public IActor {
PassAway();
}
public:
- TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config)
+ TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config)
: IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity)
, Target(target)
, Config(config)
@@ -208,7 +208,7 @@ void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPt
}
-IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) {
+IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) {
return new TFlowControlledRequestQueue(targetId, activity, config);
}
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.h b/library/cpp/actors/helpers/flow_controlled_queue.h
index d250405304..1d03226103 100644
--- a/library/cpp/actors/helpers/flow_controlled_queue.h
+++ b/library/cpp/actors/helpers/flow_controlled_queue.h
@@ -13,6 +13,6 @@ namespace NActors {
ui32 LatencyFactor = 4;
};
- IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity = IActor::ACTORLIB_COMMON, const TFlowControlledQueueConfig &config = TFlowControlledQueueConfig());
+ IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity = IActor::ACTORLIB_COMMON, const TFlowControlledQueueConfig &config = TFlowControlledQueueConfig());
}
diff --git a/library/cpp/actors/helpers/mon_histogram_helper.h b/library/cpp/actors/helpers/mon_histogram_helper.h
index a9a57e3823..80b9690a75 100644
--- a/library/cpp/actors/helpers/mon_histogram_helper.h
+++ b/library/cpp/actors/helpers/mon_histogram_helper.h
@@ -1,9 +1,9 @@
#pragma once
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-
-#include <util/string/cast.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <util/string/cast.h>
+
namespace NActors {
namespace NMon {
class THistogramCounterHelper {
@@ -79,7 +79,7 @@ namespace NActors {
ui64 FirstBucketVal;
ui64 BucketCount;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> BucketsHolder;
- TVector<NMonitoring::TDeprecatedCounter*> Buckets;
+ TVector<NMonitoring::TDeprecatedCounter*> Buckets;
};
}
diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h
index d7d07f9fa8..d1f320509e 100644
--- a/library/cpp/actors/helpers/selfping_actor.h
+++ b/library/cpp/actors/helpers/selfping_actor.h
@@ -1,7 +1,7 @@
#pragma once
#include <library/cpp/actors/core/actor.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NActors {
diff --git a/library/cpp/actors/helpers/ya.make b/library/cpp/actors/helpers/ya.make
index d8771179de..0169a2c727 100644
--- a/library/cpp/actors/helpers/ya.make
+++ b/library/cpp/actors/helpers/ya.make
@@ -14,7 +14,7 @@ SRCS(
PEERDIR(
library/cpp/actors/core
- library/cpp/monlib/dynamic_counters
+ library/cpp/monlib/dynamic_counters
)
END()
diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp
index 27c4eeb6f3..834fe47b73 100644
--- a/library/cpp/actors/http/http_cache.cpp
+++ b/library/cpp/actors/http/http_cache.cpp
@@ -16,7 +16,7 @@ namespace NHttp {
class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig {
public:
using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>;
- NActors::TActorId HttpProxyId;
+ NActors::TActorId HttpProxyId;
TGetCachePolicy GetCachePolicy;
static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
@@ -584,7 +584,7 @@ TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePoli
return policy;
}
-NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
+NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
}
diff --git a/library/cpp/actors/http/http_cache.h b/library/cpp/actors/http/http_cache.h
index ac38bdcac8..313c7bd266 100644
--- a/library/cpp/actors/http/http_cache.h
+++ b/library/cpp/actors/http/http_cache.h
@@ -19,7 +19,7 @@ struct TCachePolicy {
using TGetCachePolicy = std::function<TCachePolicy(const THttpRequest*)>;
-NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
+NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& policy = TCachePolicy());
diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp
index 36c6855d93..3a466006cd 100644
--- a/library/cpp/actors/http/http_proxy.cpp
+++ b/library/cpp/actors/http/http_proxy.cpp
@@ -1,5 +1,5 @@
#include <library/cpp/actors/core/events.h>
-#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
#include "http_proxy.h"
namespace NHttp {
@@ -8,7 +8,7 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
public:
IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller);
- TActorId acceptorId = ctx.Register(listeningSocket);
+ TActorId acceptorId = ctx.Register(listeningSocket);
ctx.Send(event->Forward(acceptorId));
Acceptors.emplace_back(acceptorId);
return listeningSocket;
@@ -16,7 +16,7 @@ public:
IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) {
IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller);
- TActorId connectionId = ctx.Register(connectionSocket);
+ TActorId connectionId = ctx.Register(connectionSocket);
Connections.emplace(connectionId);
return connectionSocket;
}
@@ -26,7 +26,7 @@ public:
Become(&THttpProxy::StateWork);
}
- THttpProxy(NMonitoring::TMetricRegistry& sensors)
+ THttpProxy(NMonitoring::TMetricRegistry& sensors)
: Sensors(sensors)
{}
@@ -49,10 +49,10 @@ protected:
void PassAway() override {
Send(Poller, new NActors::TEvents::TEvPoisonPill());
- for (const NActors::TActorId& connection : Connections) {
+ for (const NActors::TActorId& connection : Connections) {
Send(connection, new NActors::TEvents::TEvPoisonPill());
}
- for (const NActors::TActorId& acceptor : Acceptors) {
+ for (const NActors::TActorId& acceptor : Acceptors) {
Send(acceptor, new NActors::TEvents::TEvPoisonPill());
}
NActors::TActorBootstrapped<THttpProxy>::PassAway();
@@ -60,7 +60,7 @@ protected:
void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
TStringBuf url = event->Get()->Request->URL.Before('?');
- THashMap<TString, TActorId>::iterator it;
+ THashMap<TString, TActorId>::iterator it;
while (!url.empty()) {
it = Handlers.find(url);
if (it != Handlers.end()) {
@@ -204,8 +204,8 @@ protected:
PassAway();
}
- NActors::TActorId Poller;
- TVector<TActorId> Acceptors;
+ NActors::TActorId Poller;
+ TVector<TActorId> Acceptors;
struct THostEntry {
TSockAddrInet6 Address;
@@ -215,9 +215,9 @@ protected:
static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60);
THashMap<TString, THostEntry> Hosts;
- THashMap<TString, TActorId> Handlers;
- THashSet<TActorId> Connections; // outgoing
- NMonitoring::TMetricRegistry& Sensors;
+ THashMap<TString, TActorId> Handlers;
+ THashSet<TActorId> Connections; // outgoing
+ NMonitoring::TMetricRegistry& Sensors;
};
TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) {
@@ -240,7 +240,7 @@ TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingR
);
}
-NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) {
+NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) {
return new THttpProxy(sensors);
}
diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h
index afd0170997..97ea6fbd44 100644
--- a/library/cpp/actors/http/http_proxy.h
+++ b/library/cpp/actors/http/http_proxy.h
@@ -8,7 +8,7 @@
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/interconnect/poller_actor.h>
#include <library/cpp/dns/cache.h>
-#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/generic/variant.h>
#include "http.h"
#include "http_proxy_ssl.h"
@@ -25,7 +25,7 @@ struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig {
struct TEvHttpProxy {
enum EEv {
- EvAddListeningPort = EventSpaceBegin(NActors::TEvents::ES_HTTP),
+ EvAddListeningPort = EventSpaceBegin(NActors::TEvents::ES_HTTP),
EvConfirmListen,
EvRegisterHandler,
EvHttpIncomingRequest,
@@ -41,7 +41,7 @@ struct TEvHttpProxy {
EvEnd
};
- static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small.");
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small.");
struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> {
TIpPort Port;
@@ -71,9 +71,9 @@ struct TEvHttpProxy {
struct TEvRegisterHandler : NActors::TEventLocal<TEvRegisterHandler, EvRegisterHandler> {
TString Path;
- TActorId Handler;
+ TActorId Handler;
- TEvRegisterHandler(const TString& path, const TActorId& handler)
+ TEvRegisterHandler(const TString& path, const TActorId& handler)
: Path(path)
, Handler(handler)
{}
@@ -142,32 +142,32 @@ struct TEvHttpProxy {
struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> {
TString PeerAddress;
- TActorId ConnectionID;
+ TActorId ConnectionID;
- TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID)
+ TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID)
: PeerAddress(peerAddress)
, ConnectionID(connectionID)
{}
};
struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> {
- TActorId ConnectionID;
+ TActorId ConnectionID;
TDeque<THttpIncomingRequestPtr> RecycledRequests;
- TEvHttpConnectionClosed(const TActorId& connectionID)
+ TEvHttpConnectionClosed(const TActorId& connectionID)
: ConnectionID(connectionID)
{}
- TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests)
+ TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests)
: ConnectionID(connectionID)
, RecycledRequests(std::move(recycledRequests))
{}
};
struct TEvHttpAcceptorClosed : NActors::TEventLocal<TEvHttpAcceptorClosed, EvHttpAcceptorClosed> {
- TActorId ConnectionID;
+ TActorId ConnectionID;
- TEvHttpAcceptorClosed(const TActorId& connectionID)
+ TEvHttpAcceptorClosed(const TActorId& connectionID)
: ConnectionID(connectionID)
{}
};
@@ -218,16 +218,16 @@ struct TEvHttpProxy {
};
struct TEndpointInfo {
- TActorId Proxy;
- TActorId Owner;
+ TActorId Proxy;
+ TActorId Owner;
TString WorkerName;
bool Secure;
TSslHelpers::TSslHolder<SSL_CTX> SecureContext;
};
-NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors);
-NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller);
-NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller);
+NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors);
+NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller);
+NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller);
NActors::IActor* CreateIncomingConnectionActor(
const TEndpointInfo& endpoint,
TIntrusivePtr<TSocketDescriptor> socket,
diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp
index 9780541b71..95b07ffa84 100644
--- a/library/cpp/actors/http/http_proxy_acceptor.cpp
+++ b/library/cpp/actors/http/http_proxy_acceptor.cpp
@@ -7,15 +7,15 @@ namespace NHttp {
class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig {
public:
using TBase = NActors::TActor<TAcceptorActor>;
- const TActorId Owner;
- const TActorId Poller;
+ const TActorId Owner;
+ const TActorId Poller;
TIntrusivePtr<TSocketDescriptor> Socket;
NActors::TPollerToken::TPtr PollerToken;
- THashSet<TActorId> Connections;
+ THashSet<TActorId> Connections;
TDeque<THttpIncomingRequestPtr> RecycledRequests;
TEndpointInfo Endpoint;
- TAcceptorActor(const TActorId& owner, const TActorId& poller)
+ TAcceptorActor(const TActorId& owner, const TActorId& poller)
: NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
, Owner(owner)
, Poller(poller)
@@ -77,12 +77,12 @@ protected:
}
}
LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << bindAddress.ToString() << " - retrying...");
- ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release());
+ ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release());
}
void Die(const NActors::TActorContext& ctx) override {
ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID));
- for (const NActors::TActorId& connection : Connections) {
+ for (const NActors::TActorId& connection : Connections) {
ctx.Send(connection, new NActors::TEvents::TEvPoisonPill());
}
}
@@ -104,7 +104,7 @@ protected:
connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front()));
RecycledRequests.pop_front();
}
- NActors::TActorId connectionId = ctx.Register(connectionSocket);
+ NActors::TActorId connectionId = ctx.Register(connectionSocket);
ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
Connections.emplace(connectionId);
socket = new TSocketDescriptor();
@@ -128,7 +128,7 @@ protected:
}
};
-NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
+NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
return new TAcceptorActor(owner, poller);
}
diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp
index d9189dba8a..5bd4dd74b0 100644
--- a/library/cpp/actors/http/http_proxy_outgoing.cpp
+++ b/library/cpp/actors/http/http_proxy_outgoing.cpp
@@ -8,18 +8,18 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
public:
using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
using TSelf = TOutgoingConnectionActor<TSocketImpl>;
- const TActorId Owner;
- const TActorId Poller;
+ const TActorId Owner;
+ const TActorId Poller;
SocketAddressType Address;
TString Host;
- TActorId RequestOwner;
+ TActorId RequestOwner;
THttpOutgoingRequestPtr Request;
THttpIncomingResponsePtr Response;
TInstant LastActivity;
TDuration ConnectionTimeout = CONNECTION_TIMEOUT;
NActors::TPollerToken::TPtr PollerToken;
- TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller)
+ TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller)
: TBase(&TSelf::StateWaiting)
, Owner(owner)
, Poller(poller)
@@ -38,7 +38,7 @@ public:
void ReplyAndDie(const NActors::TActorContext& ctx) {
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")");
ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response));
- RequestOwner = TActorId();
+ RequestOwner = TActorId();
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
ctx.Send(Owner, sensors.Release());
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
@@ -49,7 +49,7 @@ public:
LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error);
if (RequestOwner) {
ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
- RequestOwner = TActorId();
+ RequestOwner = TActorId();
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
ctx.Send(Owner, sensors.Release());
Die(ctx);
@@ -287,7 +287,7 @@ protected:
}
};
-NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) {
+NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) {
if (secure) {
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller);
} else {
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index 4c922f8d0f..b21ceb550f 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -180,17 +180,17 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
- NMonitoring::TMetricRegistry sensors;
+ NMonitoring::TMetricRegistry sensors;
NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
- NActors::TActorId proxyId = actorSystem.Register(proxy);
- actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
+ NActors::TActorId proxyId = actorSystem.Register(proxy);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
actorSystem.DispatchEvents();
- NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
+ NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
- NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
+ NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
@@ -213,7 +213,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
- NMonitoring::TMetricRegistry sensors;
+ NMonitoring::TMetricRegistry sensors;
TString certificateContent = R"___(-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w
@@ -273,7 +273,7 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
certificateFile.Write(certificateContent.data(), certificateContent.size());
NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
- NActors::TActorId proxyId = actorSystem.Register(proxy);
+ NActors::TActorId proxyId = actorSystem.Register(proxy);
THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> add = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(port);
///////// https configuration
@@ -281,13 +281,13 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
add->CertificateFile = certificateFile.Name();
add->PrivateKeyFile = certificateFile.Name();
/////////
- actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true);
actorSystem.DispatchEvents();
- NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
+ NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
- NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
+ NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test");
actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
@@ -314,11 +314,11 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
NActors::TActorSystem actorSystem(setup);
actorSystem.Start();
NHttp::THttpProxy* incomingProxy = new NHttp::THttpProxy();
- NActors::TActorId incomingProxyId = actorSystem.Register(incomingProxy);
+ NActors::TActorId incomingProxyId = actorSystem.Register(incomingProxy);
actorSystem.Send(incomingProxyId, new NHttp::TEvHttpProxy::TEvAddListeningPort(13337));
NHttp::THttpProxy* outgoingProxy = new NHttp::THttpProxy();
- NActors::TActorId outgoingProxyId = actorSystem.Register(outgoingProxy);
+ NActors::TActorId outgoingProxyId = actorSystem.Register(outgoingProxy);
THolder<NHttp::THttpStaticStringRequest> httpRequest = MakeHolder<NHttp::THttpStaticStringRequest>("GET /test HTTP/1.1\r\n\r\n");
actorSystem.Send(outgoingProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest("[::]:13337", std::move(httpRequest)));
diff --git a/library/cpp/actors/http/ya.make b/library/cpp/actors/http/ya.make
index 7ce68b7a75..ade447be3f 100644
--- a/library/cpp/actors/http/ya.make
+++ b/library/cpp/actors/http/ya.make
@@ -26,7 +26,7 @@ PEERDIR(
library/cpp/actors/core
library/cpp/actors/interconnect
library/cpp/dns
- library/cpp/monlib/metrics
+ library/cpp/monlib/metrics
library/cpp/string_utils/quote
)
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 8a46ffd535..a7da62c3d7 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -107,29 +107,29 @@ namespace NActors {
struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
- TEvHandshakeAsk(const TActorId& self,
- const TActorId& peer,
+ TEvHandshakeAsk(const TActorId& self,
+ const TActorId& peer,
ui64 counter)
: Self(self)
, Peer(peer)
, Counter(counter)
{
}
- const TActorId Self;
- const TActorId Peer;
+ const TActorId Self;
+ const TActorId Peer;
const ui64 Counter;
};
struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck")
- TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
+ TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
: Self(self)
, NextPacket(nextPacket)
, Params(std::move(params))
{}
- const TActorId Self;
+ const TActorId Self;
const ui64 NextPacket;
const TSessionParams Params;
};
@@ -185,8 +185,8 @@ namespace NActors {
TEvHandshakeDone(
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
- const TActorId& peer,
- const TActorId& self,
+ const TActorId& peer,
+ const TActorId& self,
ui64 nextPacket,
TAutoPtr<TProgramInfo>&& programInfo,
TSessionParams params)
@@ -200,8 +200,8 @@ namespace NActors {
}
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
- const TActorId Peer;
- const TActorId Self;
+ const TActorId Peer;
+ const TActorId Self;
const ui64 NextPacket;
TAutoPtr<TProgramInfo> ProgramInfo;
const TSessionParams Params;
@@ -319,10 +319,10 @@ namespace NActors {
template <typename TContainer>
TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) {
- for (const TActorId& actorId : route) {
+ for (const TActorId& actorId : route) {
auto* hop = Record.AddHops();
if (actorId) {
- ActorIdToProto(actorId, hop->MutableNextHop());
+ ActorIdToProto(actorId, hop->MutableNextHop());
}
}
Record.SetId(id);
@@ -366,13 +366,13 @@ namespace NActors {
};
struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {
- TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
+ TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
: SessionID(sessionId)
, BufferSize(outputBufferSize)
{
}
- TActorId SessionID;
+ TActorId SessionID;
ui64 BufferSize;
};
diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h
index 225a5243fd..f052a6e92e 100644
--- a/library/cpp/actors/interconnect/interconnect.h
+++ b/library/cpp/actors/interconnect/interconnect.h
@@ -10,7 +10,7 @@ namespace NActors {
TString SelfAddress;
ui32 SelfPort;
- TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time)
+ TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time)
};
struct TInterconnectProxySetup: public TThrRefBase {
@@ -41,12 +41,12 @@ namespace NActors {
TIntrusivePtr<TInterconnectGlobalState> GlobalState;
- virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls
+ virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls
virtual TActorSetupCmd CreateAcceptor() = 0;
};
struct TNameserverSetup {
- TActorId ServiceID;
+ TActorId ServiceID;
TIntrusivePtr<TInterconnectGlobalState> GlobalState;
};
@@ -118,12 +118,12 @@ namespace NActors {
};
struct TNodeRegistrarSetup {
- TActorId ServiceID;
+ TActorId ServiceID;
TIntrusivePtr<TInterconnectGlobalState> GlobalState;
};
- TActorId GetNameserviceActorId();
+ TActorId GetNameserviceActorId();
/**
* Const table-lookup based name service
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index e4a0ae3cda..659a6a9e5c 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -1,6 +1,6 @@
#pragma once
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/event_load.h>
#include <library/cpp/actors/util/rope.h>
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index 285709a00c..81e0694da1 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -3,7 +3,7 @@
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/util/datetime.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/generic/map.h>
#include <util/generic/set.h>
@@ -63,7 +63,7 @@ namespace NActors {
typedef TMap<ui16, TChannelSettings> TChannelsConfig;
using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title,
- TActorSystem* actorSystem, const TActorId& actorId)>;
+ TActorSystem* actorSystem, const TActorId& actorId)>;
using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>;
@@ -71,13 +71,13 @@ namespace NActors {
bool orange, bool red, TActorSystem* actorSystem)>;
struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> {
- TActorId NameserviceId;
+ TActorId NameserviceId;
NMonitoring::TDynamicCounterPtr MonCounters;
std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
TChannelsConfig ChannelsConfig;
TInterconnectSettings Settings;
TRegisterMonPageCallback RegisterMonPage;
- TActorId DestructorId;
+ TActorId DestructorId;
std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize;
TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024;
TString ClusterUUID;
diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp
index 224160d4b4..e389e93688 100644
--- a/library/cpp/actors/interconnect/interconnect_counters.cpp
+++ b/library/cpp/actors/interconnect/interconnect_counters.cpp
@@ -619,11 +619,11 @@ namespace {
TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read");
for (const char *reason : TDisconnectReason::Reasons) {
- DisconnectByReason_[reason] = Metrics_->Rate(
- NMonitoring::MakeLabels({
- {"sensor", "interconnect.disconnect_reason"},
- {"reason", reason},
- }));
+ DisconnectByReason_[reason] = Metrics_->Rate(
+ NMonitoring::MakeLabels({
+ {"sensor", "interconnect.disconnect_reason"},
+ {"reason", reason},
+ }));
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 9ede998d8e..51d1e607bc 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -25,8 +25,8 @@ namespace NActors {
struct TInitialPacket {
struct {
- TActorId SelfVirtualId;
- TActorId PeerVirtualId;
+ TActorId SelfVirtualId;
+ TActorId PeerVirtualId;
ui64 NextPacket;
ui64 Version;
} Header;
@@ -34,7 +34,7 @@ namespace NActors {
TInitialPacket() = default;
- TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) {
+ TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) {
Header.SelfVirtualId = self;
Header.PeerVirtualId = peer;
Header.NextPacket = nextPacket;
@@ -79,8 +79,8 @@ namespace NActors {
private:
TInterconnectProxyCommon::TPtr Common;
- TActorId SelfVirtualId;
- TActorId PeerVirtualId;
+ TActorId SelfVirtualId;
+ TActorId PeerVirtualId;
ui32 PeerNodeId = 0;
ui64 NextPacketToPeer = 0;
TMaybe<ui64> NextPacketFromPeer; // will be obtained from incoming initial packet
@@ -102,7 +102,7 @@ namespace NActors {
return IActor::INTERCONNECT_HANDSHAKE;
}
- THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
+ THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
: TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
, Common(std::move(common))
@@ -377,7 +377,7 @@ namespace NActors {
// set up virtual self id to ensure peer will not drop our connection
char buf[12] = {'c', 'o', 'o', 'k', 'i', 'e', ' ', 'c', 'h', 'e', 'c', 'k'};
- SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12));
+ SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12));
bool success = true;
try {
@@ -401,7 +401,7 @@ namespace NActors {
request.SetProgramStartTime(0);
request.SetSerial(0);
request.SetReceiverNodeId(0);
- request.SetSenderActorId(TString());
+ request.SetSenderActorId(TString());
request.SetCookie(cookie);
request.SetDoCheckCookie(true);
SendExBlock(request, "SendExBlockDoCheckCookie");
@@ -419,7 +419,7 @@ namespace NActors {
}
// restore state
- SelfVirtualId = TActorId();
+ SelfVirtualId = TActorId();
std::swap(tempSocket, Socket);
std::swap(tempPollerToken, PollerToken);
return success;
@@ -455,7 +455,7 @@ namespace NActors {
request.SetProgramStartTime(Common->StartTime);
request.SetSerial(SelfVirtualId.LocalId());
request.SetReceiverNodeId(PeerNodeId);
- request.SetSenderActorId(SelfVirtualId.ToString());
+ request.SetSenderActorId(SelfVirtualId.ToString());
request.SetSenderHostName(Common->TechnicalSelfHostName);
request.SetReceiverHostName(PeerHostName);
@@ -519,7 +519,7 @@ namespace NActors {
ValidateClusterUUID(success, generateError);
ValidateVersionTag(success, generateError);
- const auto& s = success.GetSenderActorId();
+ const auto& s = success.GetSenderActorId();
PeerVirtualId.Parse(s.data(), s.size());
// recover flags
@@ -599,8 +599,8 @@ namespace NActors {
SendInitialPacket();
} else {
// peer wants a new session, clear fields and send initial packet
- SelfVirtualId = TActorId();
- PeerVirtualId = TActorId();
+ SelfVirtualId = TActorId();
+ PeerVirtualId = TActorId();
NextPacketToPeer = 0;
SendInitialPacket();
@@ -637,7 +637,7 @@ namespace NActors {
PeerHostName = request.GetSenderHostName();
// parse peer virtual id
- const auto& str = request.GetSenderActorId();
+ const auto& str = request.GetSenderActorId();
PeerVirtualId.Parse(str.data(), str.size());
// validate request
@@ -709,7 +709,7 @@ namespace NActors {
SendExBlock(record, "ExReply");
// extract sender actor id (self virtual id)
- const auto& str = success.GetSenderActorId();
+ const auto& str = success.GetSenderActorId();
SelfVirtualId.Parse(str.data(), str.size());
} else if (auto ev = reply->CastAsLocal<TEvHandshakeReplyError>()) {
// in case of error just send reply to the peer and terminate handshake
@@ -981,8 +981,8 @@ namespace NActors {
}
};
- IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
- const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
+ IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
+ const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
TSessionParams params) {
return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket,
std::move(peerHostName), std::move(params)));
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h
index b3c0db6c5d..7c5c25c3b8 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.h
+++ b/library/cpp/actors/interconnect/interconnect_handshake.h
@@ -15,8 +15,8 @@ namespace NActors {
using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>;
- IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
- const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
+ IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self,
+ const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
TSessionParams params);
IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket);
diff --git a/library/cpp/actors/interconnect/interconnect_impl.h b/library/cpp/actors/interconnect/interconnect_impl.h
index ee29e4d397..2ca0db8763 100644
--- a/library/cpp/actors/interconnect/interconnect_impl.h
+++ b/library/cpp/actors/interconnect/interconnect_impl.h
@@ -4,7 +4,7 @@
#include <library/cpp/actors/protos/interconnect.pb.h>
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/helpers/mon_histogram_helper.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NActors {
// resolve node info
diff --git a/library/cpp/actors/interconnect/interconnect_mon.cpp b/library/cpp/actors/interconnect/interconnect_mon.cpp
index cf924ccbf9..48823c5b0e 100644
--- a/library/cpp/actors/interconnect/interconnect_mon.cpp
+++ b/library/cpp/actors/interconnect/interconnect_mon.cpp
@@ -1,9 +1,9 @@
#include "interconnect_mon.h"
#include "interconnect_tcp_proxy.h"
-
-#include <library/cpp/json/json_value.h>
-#include <library/cpp/json/json_writer.h>
-#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <library/cpp/json/json_value.h>
+#include <library/cpp/json/json_writer.h>
+#include <library/cpp/monlib/service/pages/templates.h>
#include <openssl/ssl.h>
#include <openssl/pem.h>
@@ -14,7 +14,7 @@ namespace NInterconnect {
class TInterconnectMonActor : public TActor<TInterconnectMonActor> {
class TQueryProcessor : public TActorBootstrapped<TQueryProcessor> {
- const TActorId Sender;
+ const TActorId Sender;
const bool Json;
TMap<ui32, TInterconnectProxyTCP::TProxyStats> Stats;
ui32 PendingReplies = 0;
@@ -24,7 +24,7 @@ namespace NInterconnect {
return INTERCONNECT_MONACTOR;
}
- TQueryProcessor(const TActorId& sender, bool json)
+ TQueryProcessor(const TActorId& sender, bool json)
: Sender(sender)
, Json(json)
{}
diff --git a/library/cpp/actors/interconnect/interconnect_mon.h b/library/cpp/actors/interconnect/interconnect_mon.h
index 3fb26053fb..e78229a2c4 100644
--- a/library/cpp/actors/interconnect/interconnect_mon.h
+++ b/library/cpp/actors/interconnect/interconnect_mon.h
@@ -7,9 +7,9 @@ namespace NInterconnect {
NActors::IActor *CreateInterconnectMonActor(TIntrusivePtr<NActors::TInterconnectProxyCommon> common = nullptr);
- static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) {
+ static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) {
char s[12] = {'I', 'C', 'O', 'v', 'e', 'r', 'v', 'i', 'e', 'w', 0, 0};
- return NActors::TActorId(nodeId, TStringBuf(s, 12));
+ return NActors::TActorId(nodeId, TStringBuf(s, 12));
}
} // NInterconnect
diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
index 43419bf70d..c9f6f8b5dc 100644
--- a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
+++ b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp
@@ -79,8 +79,8 @@ namespace NActors {
return true;
}
- TActorId GetNameserviceActorId() {
- return TActorId(0, "namesvc");
+ TActorId GetNameserviceActorId() {
+ return TActorId(0, "namesvc");
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 0abe9fe659..b42ae8dffd 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -6,7 +6,7 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket,
+ TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket,
TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common,
std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed,
TDuration deadPeerTimeout, TSessionParams params)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 7e2d8ccb94..4191951abd 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -3,7 +3,7 @@
#include "interconnect_tcp_session.h"
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/monlib/service/pages/templates.h>
#include <util/system/getpid.h>
namespace NActors {
@@ -45,7 +45,7 @@ namespace NActors {
LOG_INFO_IC("ICP01", "ready to work");
}
- void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
+ void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
if (!DynamicPtr) {
// perform usual bootstrap for static nodes
sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
@@ -311,9 +311,9 @@ namespace NActors {
auto event = MakeHolder<TEvHandshakeReplyOK>();
auto* pb = event->Record.MutableSuccess();
- const TActorId virtualId = GenerateSessionVirtualId();
+ const TActorId virtualId = GenerateSessionVirtualId();
pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
- pb->SetSenderActorId(virtualId.ToString());
+ pb->SetSenderActorId(virtualId.ToString());
pb->SetProgramPID(GetPID());
pb->SetProgramStartTime(Common->StartTime);
pb->SetSerial(virtualId.LocalId());
@@ -536,14 +536,14 @@ namespace NActors {
SessionVirtualId.ToString().data());
Session = nullptr;
- SessionID = TActorId();
+ SessionID = TActorId();
// drop all pending events as we are closed
ProcessPendingSessionEvents();
// reset virtual ids as this session is terminated
- SessionVirtualId = TActorId();
- RemoteSessionVirtualId = TActorId();
+ SessionVirtualId = TActorId();
+ RemoteSessionVirtualId = TActorId();
if (Metrics) {
Metrics->IncSessionDeaths();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 023e5bd1ee..e5921134ed 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -4,7 +4,7 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/core/events.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include "interconnect_common.h"
#include "interconnect_counters.h"
@@ -70,7 +70,7 @@ namespace NActors {
}
void Bootstrap();
- void Registered(TActorSystem* sys, const TActorId& owner) override;
+ void Registered(TActorSystem* sys, const TActorId& owner) override;
private:
friend class TInterconnectSessionTCP;
@@ -366,7 +366,7 @@ namespace NActors {
// read only
TInterconnectProxyCommon::TPtr const Common;
- const TActorId& GetNameserviceId() const {
+ const TActorId& GetNameserviceId() const {
return Common->NameserviceId;
}
@@ -403,24 +403,24 @@ namespace NActors {
void DropSessionEvent(STATEFN_SIG);
TInterconnectSessionTCP* Session = nullptr;
- TActorId SessionID;
+ TActorId SessionID;
// virtual ids used during handshake to check if it is the connection
// for the same session or to find out the latest shandshake
// it's virtual because session actor apears after successfull handshake
- TActorId SessionVirtualId;
- TActorId RemoteSessionVirtualId;
+ TActorId SessionVirtualId;
+ TActorId RemoteSessionVirtualId;
- TActorId GenerateSessionVirtualId() {
+ TActorId GenerateSessionVirtualId() {
ICPROXY_PROFILED;
const ui64 localId = TlsActivationContext->ExecutorThread.ActorSystem->AllocateIDSpace(1);
- return NActors::TActorId(SelfId().NodeId(), 0, localId, 0);
+ return NActors::TActorId(SelfId().NodeId(), 0, localId, 0);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TActorId IncomingHandshakeActor;
+ TActorId IncomingHandshakeActor;
TInstant IncomingHandshakeActorFilledIn;
TInstant IncomingHandshakeActorReset;
TMaybe<ui64> LastSerialFromIncomingHandshake;
@@ -429,7 +429,7 @@ namespace NActors {
void DropIncomingHandshake(bool poison = true) {
ICPROXY_PROFILED;
- if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) {
+ if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) {
LOG_DEBUG_IC("ICP111", "dropped incoming handshake: %s poison: %s", actorId.ToString().data(),
poison ? "true" : "false");
if (poison) {
@@ -444,7 +444,7 @@ namespace NActors {
void DropOutgoingHandshake(bool poison = true) {
ICPROXY_PROFILED;
- if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) {
+ if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) {
LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(),
poison ? "true" : "false");
if (poison) {
@@ -477,12 +477,12 @@ namespace NActors {
SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection);
}
- void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
+ void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
THolder<IEventBase> event);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TActorId OutgoingHandshakeActor;
+ TActorId OutgoingHandshakeActor;
TInstant OutgoingHandshakeActorCreated;
TInstant OutgoingHandshakeActorReset;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index b95c994598..2c025dc389 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -23,7 +23,7 @@ namespace NActors {
}
}
- TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
+ TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h
index fc71073c2d..086fe26ab3 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h
@@ -34,7 +34,7 @@ namespace NActors {
}
}
- TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override;
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override;
void Die(const TActorContext& ctx) override;
@@ -50,8 +50,8 @@ namespace NActors {
TInterconnectProxyCommon::TPtr const ProxyCommonCtx;
};
- static inline TActorId MakeInterconnectListenerActorId(bool dynamic) {
+ static inline TActorId MakeInterconnectListenerActorId(bool dynamic) {
char x[12] = {'I', 'C', 'L', 'i', 's', 't', 'e', 'n', 'e', 'r', '/', dynamic ? 'D' : 'S'};
- return TActorId(0, TStringBuf(x, 12));
+ return TActorId(0, TStringBuf(x, 12));
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 2ded7f9f53..468e8bdd64 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -7,7 +7,7 @@
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/monlib/service/pages/templates.h>
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
@@ -474,7 +474,7 @@ namespace NActors {
if (ev->Sender == ReceiverId) {
const bool wasConnected(Socket);
LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
- ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
+ ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
if (wasConnected) {
// we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
// restart handshake process, closing our part of socket first
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 7fc00dbcc5..dfab4065c0 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -10,7 +10,7 @@
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/util/funnel_queue.h>
#include <library/cpp/actors/util/recentwnd.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <util/generic/queue.h>
@@ -179,7 +179,7 @@ namespace NActors {
return INTERCONNECT_SESSION_TCP;
}
- TInputSessionTCP(const TActorId& sessionId,
+ TInputSessionTCP(const TActorId& sessionId,
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
TIntrusivePtr<TReceiveContext> context,
TInterconnectProxyCommon::TPtr common,
@@ -495,7 +495,7 @@ namespace NActors {
void GenerateHttpInfo(TStringStream& str);
TIntrusivePtr<TReceiveContext> ReceiveContext;
- TActorId ReceiverId;
+ TActorId ReceiverId;
TDuration Ping;
ui64 ConfirmPacketsForcedBySize = 0;
@@ -513,7 +513,7 @@ namespace NActors {
: public TActorBootstrapped<TInterconnectSessionKiller> {
ui32 RepliesReceived = 0;
ui32 RepliesNumber = 0;
- TActorId LargestSession = TActorId();
+ TActorId LargestSession = TActorId();
ui64 MaxBufferSize = 0;
TInterconnectProxyCommon::TPtr Common;
@@ -529,7 +529,7 @@ namespace NActors {
void Bootstrap() {
auto sender = SelfId();
- const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
+ const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
auto ev = new TEvSessionBufferSizeRequest();
return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
};
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index 2a8443da71..22850b3126 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -72,7 +72,7 @@ namespace NInterconnect {
};
class TLoadResponderMasterActor : public TActorBootstrapped<TLoadResponderMasterActor> {
- TVector<TActorId> Slaves;
+ TVector<TActorId> Slaves;
ui32 SlaveIndex = 0;
STRICT_STFUNC(StateFunc,
@@ -93,7 +93,7 @@ namespace NInterconnect {
}
void Die(const TActorContext& ctx) override {
- for (const TActorId& actorId : Slaves) {
+ for (const TActorId& actorId : Slaves) {
ctx.Send(actorId, new TEvents::TEvPoisonPill);
}
TActorBootstrapped::Die(ctx);
@@ -122,9 +122,9 @@ namespace NInterconnect {
return new TLoadResponderMasterActor();
}
- TActorId MakeLoadResponderActorId(ui32 nodeId) {
+ TActorId MakeLoadResponderActorId(ui32 nodeId) {
char x[12] = {'I', 'C', 'L', 'o', 'a', 'd', 'R', 'e', 's', 'p', 'A', 'c'};
- return TActorId(nodeId, TStringBuf(x, 12));
+ return TActorId(nodeId, TStringBuf(x, 12));
}
class TLoadActor: public TActorBootstrapped<TLoadActor> {
@@ -144,8 +144,8 @@ namespace NInterconnect {
TInstant NextMessageTimestamp;
THashMap<TString, TMessageInfo> InFly;
ui64 NextId = 1;
- TVector<TActorId> Hops;
- TActorId FirstHop;
+ TVector<TActorId> Hops;
+ TActorId FirstHop;
ui64 NumDropped = 0;
std::shared_ptr<std::atomic_uint64_t> Traffic;
@@ -167,7 +167,7 @@ namespace NInterconnect {
Traffic = std::move(ev->Get()->Traffic);
for (const ui32 nodeId : Params.NodeHops) {
- const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId();
+ const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId();
if (!FirstHop) {
FirstHop = actorId;
} else {
diff --git a/library/cpp/actors/interconnect/load.h b/library/cpp/actors/interconnect/load.h
index 0a01a0dc04..060fa7641b 100644
--- a/library/cpp/actors/interconnect/load.h
+++ b/library/cpp/actors/interconnect/load.h
@@ -5,7 +5,7 @@
namespace NInterconnect {
// load responder -- lives on every node as a service actor
NActors::IActor* CreateLoadResponderActor();
- NActors::TActorId MakeLoadResponderActorId(ui32 node);
+ NActors::TActorId MakeLoadResponderActorId(ui32 node);
// load actor -- generates load with specific parameters
struct TLoadParams {
diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp
index 884503e602..1267920559 100644
--- a/library/cpp/actors/interconnect/mock/ic_mock.cpp
+++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp
@@ -42,7 +42,7 @@ namespace NActors {
: Key(key)
{}
- void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) {
+ void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) {
TPeerInfo *peer = GetPeer(nodeId);
auto guard = TWriteGuard(peer->Mutex);
Y_VERIFY(!peer->ActorSystem);
@@ -188,7 +188,7 @@ namespace NActors {
, Common(std::move(common))
{}
- void Registered(TActorSystem *as, const TActorId& parent) override {
+ void Registered(TActorSystem *as, const TActorId& parent) override {
TActor::Registered(as, parent);
State.Attach(NodeId, as, SelfId());
}
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 4ba50a2b5f..187d0b6bdf 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -18,7 +18,7 @@
using NActors::IEventBase;
using NActors::IEventHandle;
-using NActors::TActorId;
+using NActors::TActorId;
using NActors::TConstIoVec;
using NActors::TEventSerializedData;
@@ -91,8 +91,8 @@ union TTcpPacketBuf {
struct TEventDescr {
ui32 Type;
ui32 Flags;
- TActorId Recipient;
- TActorId Sender;
+ TActorId Recipient;
+ TActorId Sender;
ui64 Cookie;
// wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor
NWilson::TTraceId::TSerializedTraceId TraceId;
@@ -102,7 +102,7 @@ struct TEventDescr {
struct TEventHolder : TNonCopyable {
TEventDescr Descr;
- TActorId ForwardRecipient;
+ TActorId ForwardRecipient;
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
ui64 Serial;
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index e75cbcaef4..8c7b61a7a7 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -1,35 +1,35 @@
#include "poller_actor.h"
#include "interconnect_common.h"
-#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/probes.h>
-#include <library/cpp/actors/protos/services_common.pb.h>
+#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/actors/util/funnel_queue.h>
-
+
#include <util/generic/intrlist.h>
#include <util/system/thread.h>
#include <util/system/event.h>
#include <util/system/pipe.h>
#include <variant>
-
+
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- namespace {
+ namespace {
int LastSocketError() {
#if defined(_win_)
return WSAGetLastError();
#else
return errno;
#endif
- }
- }
-
+ }
+ }
+
struct TSocketRecord : TThrRefBase {
const TIntrusivePtr<TSharedDescriptor> Socket;
const TActorId ReadActorId;
@@ -57,7 +57,7 @@ namespace NActors {
: Socket(std::move(socket))
{}
};
-
+
using TPollerSyncOperation = std::variant<TPollerExitThread, TPollerWakeup, TPollerUnregisterSocket>;
struct TPollerSyncOperationWrapper {
@@ -149,7 +149,7 @@ namespace NActors {
bool DrainReadEnd() {
size_t totalRead = 0;
char buffer[4096];
- for (;;) {
+ for (;;) {
ssize_t n = ReadEnd.Read(buffer, sizeof(buffer));
if (n < 0) {
const int error = LastSocketError();
@@ -157,17 +157,17 @@ namespace NActors {
continue;
} else if (error == EAGAIN || error == EWOULDBLOCK) {
break;
- } else {
+ } else {
Y_FAIL("read() failed with %s", strerror(errno));
- }
+ }
} else {
Y_VERIFY(n);
totalRead += n;
- }
+ }
}
return totalRead;
}
-
+
bool ProcessSyncOpQueue() {
if (DrainReadEnd()) {
Y_VERIFY(!SyncOperationsQ.IsEmpty());
@@ -181,25 +181,25 @@ namespace NActors {
return false; // terminate the thread
} else if (std::get_if<TPollerWakeup>(&op->Operation)) {
op->SignalDone();
- } else {
+ } else {
Y_FAIL();
- }
+ }
} while (SyncOperationsQ.Pop());
- }
+ }
return true;
- }
-
+ }
+
void *ThreadProc() override {
SetCurrentThreadName("network poller");
while (ProcessSyncOpQueue()) {
static_cast<TDerived&>(*this).ProcessEventsInLoop();
- }
+ }
return nullptr;
- }
+ }
};
-
+
} // namespace NActors
-
+
#if defined(_linux_)
# include "poller_actor_linux.h"
#elif defined(_darwin_)
@@ -209,38 +209,38 @@ namespace NActors {
#else
# error "Unsupported platform"
#endif
-
+
namespace NActors {
-
+
class TPollerToken::TImpl {
std::weak_ptr<TPollerThread> Thread;
TIntrusivePtr<TSocketRecord> Record; // valid only when Thread is held locked
-
- public:
+
+ public:
TImpl(std::shared_ptr<TPollerThread> thread, TIntrusivePtr<TSocketRecord> record)
: Thread(thread)
, Record(std::move(record))
- {
+ {
thread->RegisterSocket(Record);
}
-
+
~TImpl() {
if (auto thread = Thread.lock()) {
thread->UnregisterSocket(Record);
- }
- }
-
+ }
+ }
+
void Request(bool read, bool write) {
if (auto thread = Thread.lock()) {
thread->Request(Record, read, write);
- }
- }
+ }
+ }
const TIntrusivePtr<TSharedDescriptor>& Socket() const {
return Record->Socket;
}
- };
-
+ };
+
class TPollerActor: public TActorBootstrapped<TPollerActor> {
// poller thread
std::shared_ptr<TPollerThread> PollerThread;
diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h
index f927b82089..5bd4f50704 100644
--- a/library/cpp/actors/interconnect/poller_actor.h
+++ b/library/cpp/actors/interconnect/poller_actor.h
@@ -55,9 +55,9 @@ namespace NActors {
IActor* CreatePollerActor();
- inline TActorId MakePollerActorId() {
+ inline TActorId MakePollerActorId() {
char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'};
- return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
}
}
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 565a511859..bbdabbd339 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
index e6b2bd4e4c..334859882f 100644
--- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
@@ -2,7 +2,7 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/interconnect/interconnect_common.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/interconnect/event_holder_pool.h>
#include <atomic>
diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp
index ba2a50c6f6..d67509f058 100644
--- a/library/cpp/actors/interconnect/ut/large.cpp
+++ b/library/cpp/actors/interconnect/ut/large.cpp
@@ -14,10 +14,10 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
using namespace NActors;
class TProducer: public TActorBootstrapped<TProducer> {
- const TActorId RecipientActorId;
+ const TActorId RecipientActorId;
public:
- TProducer(const TActorId& recipientActorId)
+ TProducer(const TActorId& recipientActorId)
: RecipientActorId(recipientActorId)
{}
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
class TConsumer : public TActorBootstrapped<TConsumer> {
TManualEvent& Done;
- TActorId SessionId;
+ TActorId SessionId;
public:
TConsumer(TManualEvent& done)
@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
TManualEvent done;
TConsumer* consumer = new TConsumer(done);
- const TActorId recp = testCluster.RegisterActor(consumer, 1);
+ const TActorId recp = testCluster.RegisterActor(consumer, 1);
testCluster.RegisterActor(new TProducer(recp), 2);
done.WaitI();
}
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
index 2b6d27cd3f..ac46180804 100644
--- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
+++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -70,7 +70,7 @@ public:
~TTestICCluster() {
}
- TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
+ TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
return Nodes[nodeId]->RegisterActor(actor);
}
@@ -78,7 +78,7 @@ public:
return Nodes[nodeId]->InterconnectProxy(peerNodeId);
}
- void KillActor(ui32 nodeId, const TActorId& id) {
+ void KillActor(ui32 nodeId, const TActorId& id) {
Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill);
}
};
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index ff30b1445e..59dd2554c8 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -62,7 +62,7 @@ public:
setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),
TMailboxType::ReadAsFilled, 0));
- const TActorId loggerActorId(0, "logger");
+ const TActorId loggerActorId(0, "logger");
constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
auto loggerSettings = MakeIntrusive<NLog::TSettings>(
@@ -114,7 +114,7 @@ public:
ActorSystem->Stop();
}
- bool Send(const TActorId& recipient, IEventBase* ev) {
+ bool Send(const TActorId& recipient, IEventBase* ev) {
return ActorSystem->Send(recipient, ev);
}
@@ -127,7 +127,7 @@ public:
}
void RegisterServiceActor(const TActorId& serviceId, IActor* actor) {
- const TActorId actorId = ActorSystem->Register(actor);
+ const TActorId actorId = ActorSystem->Register(actor);
ActorSystem->RegisterLocalService(serviceId, actorId);
}
diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h
index 7591200471..07fe10d93a 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_actors.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h
@@ -3,13 +3,13 @@
namespace NActors {
class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {
protected:
- const TActorId RecipientActorId;
+ const TActorId RecipientActorId;
const ui32 Preload;
ui64 SequenceNumber = 0;
ui32 InFlySize = 0;
public:
- TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
+ TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
: RecipientActorId(recipientActorId)
, Preload(preload)
{
diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
index 23d846a2fd..dbd05ce746 100644
--- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
@@ -1,38 +1,38 @@
-#include <library/cpp/actors/interconnect/poller_actor.h>
-#include <library/cpp/actors/testlib/test_runtime.h>
-
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
+
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/network/pair.h>
-#include <util/network/socket.h>
-
-using namespace NActors;
-
-class TTestSocket: public TSharedDescriptor {
-public:
- explicit TTestSocket(SOCKET fd)
- : Fd_(fd)
- {
- }
-
- int GetDescriptor() override {
- return Fd_;
- }
-
-private:
- SOCKET Fd_;
-};
-using TTestSocketPtr = TIntrusivePtr<TTestSocket>;
-
-// create pair of connected, non-blocking sockets
-std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() {
- SOCKET fds[2];
- SocketPair(fds);
- SetNonBlock(fds[0]);
- SetNonBlock(fds[1]);
- return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])};
-}
-
+
+#include <util/network/pair.h>
+#include <util/network/socket.h>
+
+using namespace NActors;
+
+class TTestSocket: public TSharedDescriptor {
+public:
+ explicit TTestSocket(SOCKET fd)
+ : Fd_(fd)
+ {
+ }
+
+ int GetDescriptor() override {
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
+};
+using TTestSocketPtr = TIntrusivePtr<TTestSocket>;
+
+// create pair of connected, non-blocking sockets
+std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() {
+ SOCKET fds[2];
+ SocketPair(fds);
+ SetNonBlock(fds[0]);
+ SetNonBlock(fds[1]);
+ return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])};
+}
+
std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() {
// create server (listening) socket
SOCKET server = socket(AF_INET, SOCK_STREAM, 0);
@@ -74,101 +74,101 @@ std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() {
return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted));
}
-class TPollerActorTest: public TTestBase {
- UNIT_TEST_SUITE(TPollerActorTest);
- UNIT_TEST(Registration)
- UNIT_TEST(ReadNotification)
- UNIT_TEST(WriteNotification)
- UNIT_TEST(HangupNotification)
- UNIT_TEST_SUITE_END();
-
-public:
- void SetUp() override {
- ActorSystem_ = MakeHolder<TTestActorRuntimeBase>();
- ActorSystem_->Initialize();
-
- PollerId_ = ActorSystem_->Register(CreatePollerActor());
-
- TDispatchOptions opts;
- opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
- ActorSystem_->DispatchEvents(opts);
- }
-
- void Registration() {
- auto [s1, s2] = NonBlockSockets();
- auto readerId = ActorSystem_->AllocateEdgeActor();
- auto writerId = ActorSystem_->AllocateEdgeActor();
-
+class TPollerActorTest: public TTestBase {
+ UNIT_TEST_SUITE(TPollerActorTest);
+ UNIT_TEST(Registration)
+ UNIT_TEST(ReadNotification)
+ UNIT_TEST(WriteNotification)
+ UNIT_TEST(HangupNotification)
+ UNIT_TEST_SUITE_END();
+
+public:
+ void SetUp() override {
+ ActorSystem_ = MakeHolder<TTestActorRuntimeBase>();
+ ActorSystem_->Initialize();
+
+ PollerId_ = ActorSystem_->Register(CreatePollerActor());
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
+ ActorSystem_->DispatchEvents(opts);
+ }
+
+ void Registration() {
+ auto [s1, s2] = NonBlockSockets();
+ auto readerId = ActorSystem_->AllocateEdgeActor();
+ auto writerId = ActorSystem_->AllocateEdgeActor();
+
RegisterSocket(s1, readerId, writerId);
-
- // reader should receive event after socket registration
+
+ // reader should receive event after socket registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId);
token = ev->Get()->PollerToken;
- }
-
- // writer should receive event after socket registration
- {
+ }
+
+ // writer should receive event after socket registration
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId);
UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken);
- }
- }
-
- void ReadNotification() {
- auto [r, w] = NonBlockSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ }
+ }
+
+ void ReadNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
RegisterSocket(r, clientId, {});
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
- char buf;
-
- // data not ready yet for read
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
- UNIT_ASSERT(errno == EWOULDBLOCK);
-
+ }
+
+ char buf;
+
+ // data not ready yet for read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+
// request read poll
token->Request(true, false);
- // write data
- UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1);
-
- // notification after socket become readable
- {
+ // write data
+ UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1);
+
+ // notification after socket become readable
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
UNIT_ASSERT(ev->Get()->Read);
UNIT_ASSERT(!ev->Get()->Write);
- }
-
- // read data
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1);
- UNIT_ASSERT_EQUAL('x', buf);
-
- // no more data to read
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
- UNIT_ASSERT(errno == EWOULDBLOCK);
- }
-
- void WriteNotification() {
+ }
+
+ // read data
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1);
+ UNIT_ASSERT_EQUAL('x', buf);
+
+ // no more data to read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+ }
+
+ void WriteNotification() {
auto [r, w] = TcpSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
SetNonBlock(w->GetDescriptor());
RegisterSocket(w, TActorId{}, clientId);
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
{
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
+ }
+
char buffer[4096];
memset(buffer, 'x', sizeof(buffer));
@@ -181,7 +181,7 @@ public:
written += res;
} else if (res == 0) {
UNIT_FAIL("unexpected zero return from send()");
- } else {
+ } else {
UNIT_ASSERT(res == -1);
if (errno == EINTR) {
continue;
@@ -191,10 +191,10 @@ public:
} else {
UNIT_FAIL("unexpected error from send()");
}
- }
- }
+ }
+ }
Cerr << "written " << written << " bytes" << Endl;
-
+
// read all written data from the read end
for (;;) {
char buffer[4096];
@@ -216,7 +216,7 @@ public:
}
}
}
-
+
// wait for notification after socket becomes writable again
{
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
@@ -224,41 +224,41 @@ public:
UNIT_ASSERT(!ev->Get()->Read);
UNIT_ASSERT(ev->Get()->Write);
}
- }
- }
-
- void HangupNotification() {
- auto [r, w] = NonBlockSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ }
+ }
+
+ void HangupNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
RegisterSocket(r, clientId, TActorId{});
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
+ }
+
token->Request(true, false);
ShutDown(w->GetDescriptor(), SHUT_RDWR);
-
+
// notification after peer shuts down its socket
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
UNIT_ASSERT(ev->Get()->Read);
- }
- }
-
-private:
+ }
+ }
+
+private:
void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) {
auto ev = new TEvPollerRegister{socket, readActorId, writeActorId};
- ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
- }
-
-private:
- THolder<TTestActorRuntimeBase> ActorSystem_;
- TActorId PollerId_;
-};
-
-UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest);
+ ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
+ }
+
+private:
+ THolder<TTestActorRuntimeBase> ActorSystem_;
+ TActorId PollerId_;
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest);
diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make
index 2f5b13352e..ec19f1a64a 100644
--- a/library/cpp/actors/interconnect/ut/ya.make
+++ b/library/cpp/actors/interconnect/ut/ya.make
@@ -15,11 +15,11 @@ ELSE()
ENDIF()
SRCS(
- channel_scheduler_ut.cpp
+ channel_scheduler_ut.cpp
event_holder_pool_ut.cpp
interconnect_ut.cpp
large.cpp
- poller_actor_ut.cpp
+ poller_actor_ut.cpp
dynamic_proxy_ut.cpp
)
@@ -28,7 +28,7 @@ PEERDIR(
library/cpp/actors/interconnect
library/cpp/actors/interconnect/ut/lib
library/cpp/actors/interconnect/ut/protos
- library/cpp/actors/testlib
+ library/cpp/actors/testlib
library/cpp/digest/md5
library/cpp/testing/unittest
)
diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp
index 5d19bc3003..69374cd080 100644
--- a/library/cpp/actors/interconnect/ut_fat/main.cpp
+++ b/library/cpp/actors/interconnect/ut_fat/main.cpp
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
ui16 SendFlags;
public:
- TSenderActor(const TActorId& recipientActorId, ui16 sendFlags)
+ TSenderActor(const TActorId& recipientActorId, ui16 sendFlags)
: TSenderBaseActor(recipientActorId, 32)
, SendFlags(sendFlags)
{
@@ -108,7 +108,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
- const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
+ const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
TSenderActor* senderActor = new TSenderActor(recipient, flags);
testCluster.RegisterActor(senderActor, 1);
@@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
- const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
+ const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
TSenderActor* senderActor = new TSenderActor(recipient, flags);
testCluster.RegisterActor(senderActor, 1);
diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make
index 60d29b0fc0..9e4fb46fdb 100644
--- a/library/cpp/actors/interconnect/ya.make
+++ b/library/cpp/actors/interconnect/ya.make
@@ -75,18 +75,18 @@ PEERDIR(
contrib/libs/libc_compat
contrib/libs/openssl
library/cpp/actors/core
- library/cpp/actors/dnscachelib
+ library/cpp/actors/dnscachelib
library/cpp/actors/dnsresolver
library/cpp/actors/helpers
library/cpp/actors/prof
library/cpp/actors/protos
library/cpp/actors/util
library/cpp/digest/crc32c
- library/cpp/json
+ library/cpp/json
library/cpp/lwtrace
- library/cpp/monlib/dynamic_counters
+ library/cpp/monlib/dynamic_counters
library/cpp/monlib/metrics
- library/cpp/monlib/service/pages/tablesorter
+ library/cpp/monlib/service/pages/tablesorter
library/cpp/openssl/init
library/cpp/packedtypes
)
diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto
index 5fbd6d44ee..8155535f1f 100644
--- a/library/cpp/actors/protos/actors.proto
+++ b/library/cpp/actors/protos/actors.proto
@@ -2,12 +2,12 @@ package NActorsProto;
option java_package = "ru.yandex.kikimr.proto";
option java_outer_classname = "NActorsBaseProto";
-message TActorId {
+message TActorId {
required fixed64 RawX1 = 1;
required fixed64 RawX2 = 2;
}
message TCallbackException {
- required TActorId ActorId = 1;
+ required TActorId ActorId = 1;
required string ExceptionMessage = 2;
}
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 2e3b0d0d15..8656c8ea1f 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -51,7 +51,7 @@ message THandshakeRequest {
required uint64 Serial = 4;
required uint32 ReceiverNodeId = 5;
- required string SenderActorId = 6;
+ required string SenderActorId = 6;
optional string SenderHostName = 7;
optional string ReceiverHostName = 8;
@@ -81,7 +81,7 @@ message THandshakeSuccess {
required uint64 ProgramStartTime = 3;
required uint64 Serial = 4;
- required string SenderActorId = 5;
+ required string SenderActorId = 5;
optional string VersionTag = 6;
repeated string AcceptedVersionTags = 7;
@@ -104,7 +104,7 @@ message THandshakeReply {
message TEvLoadMessage {
message THop {
- optional NActorsProto.TActorId NextHop = 1; // if zero, then the payload is trimmed out of the message
+ optional NActorsProto.TActorId NextHop = 1; // if zero, then the payload is trimmed out of the message
}
repeated THop Hops = 1; // the route for the message
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 6fa25b9965..cda6980b1e 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -295,17 +295,17 @@ namespace NActors {
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
DoSchedule(deadline, ev, cookie, workerId);
- }
-
+ }
+
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId);
}
void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
- TInstant deadline = Runtime->GetTimeProvider()->Now() + delay;
+ TInstant deadline = Runtime->GetTimeProvider()->Now() + delay;
DoSchedule(deadline, ev, cookie, workerId);
- }
-
+ }
+
void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
Y_UNUSED(workerId);
@@ -319,13 +319,13 @@ namespace NActors {
Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
PrintEvent(ev, Runtime);
}
-
- auto now = Runtime->GetTimeProvider()->Now();
- if (deadline < now) {
- deadline = now; // avoid going backwards in time
- }
- TDuration delay = (deadline - now);
-
+
+ auto now = Runtime->GetTimeProvider()->Now();
+ if (deadline < now) {
+ deadline = now; // avoid going backwards in time
+ }
+ TDuration delay = (deadline - now);
+
if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
@@ -336,9 +336,9 @@ namespace NActors {
if (cookie) {
cookie->Detach();
}
- if (verbose) {
+ if (verbose) {
Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
- }
+ }
}
}
@@ -366,8 +366,8 @@ namespace NActors {
ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
- const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
- TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
+ const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
+ TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
if (ev->GetRecipientRewrite() == logger) {
TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
@@ -403,7 +403,7 @@ namespace NActors {
}
TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
- const TActorId& parentId) override {
+ const TActorId& parentId) override {
return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
}
@@ -486,7 +486,7 @@ namespace NActors {
}
void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
- const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
+ const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0);
node->LogSettings->SetAllowDrop(false);
@@ -579,7 +579,7 @@ namespace NActors {
}
- void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
+ void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
runtime.ScheduleWhiteList.insert(actorId);
runtime.ScheduleWhiteListParent[actorId] = parentId;
@@ -640,7 +640,7 @@ namespace NActors {
TInstant time = scheduledEvents.begin()->Deadline;
while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
- static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
+ static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
auto& item = *scheduledEvents.begin();
TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
eventTypes[std::make_pair(item.Event->Recipient, name)]++;
@@ -725,7 +725,7 @@ namespace NActors {
VERBOSE = verbose;
}
- void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
+ void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
Y_VERIFY(!IsInitialized);
Y_VERIFY(nodeIndex < NodeCount);
auto node = Nodes[nodeIndex + FirstNodeId];
@@ -857,8 +857,8 @@ namespace NActors {
return (*TmpDir)();
}
- TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
- ui64 revolvingCounter, const TActorId& parentId) {
+ TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
+ ui64 revolvingCounter, const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
TGuard<TMutex> guard(Mutex);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
@@ -897,7 +897,7 @@ namespace NActors {
mailbox->AttachActor(localActorId, actor);
// do init
- const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
+ const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
@@ -925,8 +925,8 @@ namespace NActors {
return actorId;
}
- TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
- const TActorId& parentId) {
+ TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
+ const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
TGuard<TMutex> guard(Mutex);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
@@ -941,7 +941,7 @@ namespace NActors {
}
mailbox->AttachActor(localActorId, actor);
- const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
+ const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
@@ -949,7 +949,7 @@ namespace NActors {
return actorId;
}
- TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
+ TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
@@ -959,13 +959,13 @@ namespace NActors {
node->ActorToActorId[actor] = actorId;
}
- return node->ActorSystem->RegisterLocalService(serviceId, actorId);
+ return node->ActorSystem->RegisterLocalService(serviceId, actorId);
}
- TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
+ TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
- TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
+ TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
EdgeActors.insert(edgeActor);
EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
return edgeActor;
@@ -1414,14 +1414,14 @@ namespace NActors {
return it->second;
}
- TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
+ TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
return node->ActorSystem->LookupLocalService(serviceId);
}
- void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
+ void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
TGuard<TMutex> guard(Mutex);
ui32 dispatchCount = 0;
if (!edgeFilter.empty()) {
@@ -1429,7 +1429,7 @@ namespace NActors {
Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
}
}
- const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
+ const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
for (;;) {
for (auto edgeActor : edgeActors) {
@@ -1460,7 +1460,7 @@ namespace NActors {
}
}
- TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
+ TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndexFrom < NodeCount);
Y_VERIFY(nodeIndexTo < NodeCount);
@@ -1469,7 +1469,7 @@ namespace NActors {
return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
}
- void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
+ void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
TGuard<TMutex> guard(Mutex);
BlockedOutput.insert(actorId);
}
@@ -1480,7 +1480,7 @@ namespace NActors {
DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
}
- IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
+ IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
TGuard<TMutex> guard(Mutex);
if (nodeIndex == Max<ui32>()) {
Y_VERIFY(actorId.NodeId());
@@ -1494,7 +1494,7 @@ namespace NActors {
return FindActor(actorId, node);
}
- void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
+ void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
TGuard<TMutex> guard(Mutex);
if (allow) {
if (VERBOSE) {
@@ -1509,7 +1509,7 @@ namespace NActors {
}
}
- bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
+ bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
TGuard<TMutex> guard(Mutex);
return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
}
@@ -1570,7 +1570,7 @@ namespace NActors {
IActor* recipientActor = mailbox->FindActor(recipientLocalId);
if (recipientActor) {
// Save actorId by value in order to prevent ctx from being invalidated during another Send call.
- TActorId actorId = ev->GetRecipientRewrite();
+ TActorId actorId = ev->GetRecipientRewrite();
node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
TActivationContext *prevTlsActivationContext = TlsActivationContext;
@@ -1585,7 +1585,7 @@ namespace NActors {
recipientActor->Receive(evHolder, ctx);
node->ExecutorThread->DropUnregistered();
}
- CurrentRecipient = TActorId();
+ CurrentRecipient = TActorId();
TlsActivationContext = prevTlsActivationContext;
} else {
if (VERBOSE) {
@@ -1599,7 +1599,7 @@ namespace NActors {
}
}
- IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
+ IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
ui32 mailboxHint = actorId.Hint();
ui64 localId = actorId.LocalId();
TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
@@ -1644,7 +1644,7 @@ namespace NActors {
setup->LocalServices = node->LocalServices;
setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
- const TActorId nameserviceId = GetNameserviceActorId();
+ const TActorId nameserviceId = GetNameserviceActorId();
TIntrusivePtr<TInterconnectProxyCommon> common;
common.Reset(new TInterconnectProxyCommon);
@@ -1688,7 +1688,7 @@ namespace NActors {
NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
- std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
+ std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
setup->LocalServices.push_back(loggerActorPair);
}
@@ -1732,7 +1732,7 @@ namespace NActors {
Mailboxes.erase(mboxId);
}
- TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const {
+ TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const {
auto it = ActorNames.find(actorId);
if (it != ActorNames.end())
return it->second;
@@ -1773,7 +1773,7 @@ namespace NActors {
return TEST_ACTOR_RUNTIME;
}
- TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
+ TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker)
: Delegatee(delegatee)
@@ -1813,7 +1813,7 @@ namespace NActors {
STFUNC(Reply) {
Y_VERIFY(!HasReply);
IEventHandle *requestEv = Context->Queue->Head();
- TActorId originalSender = requestEv->Sender;
+ TActorId originalSender = requestEv->Sender;
HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
if (HasReply) {
delete Context->Queue->Pop();
@@ -1857,11 +1857,11 @@ namespace NActors {
return forwardedEv;
}
private:
- const TActorId Delegatee;
+ const TActorId Delegatee;
const bool IsSync;
- const TVector<TActorId> AdditionalActors;
+ const TVector<TActorId> AdditionalActors;
TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
- TActorId ReplyId;
+ TActorId ReplyId;
bool HasReply;
TDispatchOptions DelegateeOptions;
TTestActorRuntimeBase* Runtime;
@@ -1882,7 +1882,7 @@ namespace NActors {
{
}
- IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
+ IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
CreateReplyChecker);
}
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 26e3b45c98..b7e8edd1c5 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -199,7 +199,7 @@ namespace NActors {
typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector;
typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter;
typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter;
- typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver;
+ typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver;
TTestActorRuntimeBase(THeSingleSystemEnv);
@@ -213,7 +213,7 @@ namespace NActors {
static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline);
- static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId);
+ static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId);
TEventObserver SetObserverFunc(TEventObserver observerFunc);
TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc);
TEventFilter SetEventFilter(TEventFilter filterFunc);
@@ -232,20 +232,20 @@ namespace NActors {
TInstant GetCurrentTime() const;
void UpdateCurrentTime(TInstant newTime);
void AdvanceCurrentTime(TDuration duration);
- void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0);
+ void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0);
virtual void Initialize();
ui32 GetNodeId(ui32 index = 0) const;
ui32 GetNodeCount() const;
ui64 AllocateLocalId();
ui32 InterconnectPoolId() const;
TString GetTempDir();
- TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0,
+ TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0,
TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0,
- const TActorId& parentid = TActorId());
- TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
- const TActorId& parentid = TActorId());
- TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0);
- TActorId AllocateEdgeActor(ui32 nodeIndex = 0);
+ const TActorId& parentid = TActorId());
+ TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
+ const TActorId& parentid = TActorId());
+ TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0);
+ TActorId AllocateEdgeActor(ui32 nodeIndex = 0);
TEventsList CaptureEvents();
TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId);
TScheduledEventsList CaptureScheduledEvents();
@@ -260,13 +260,13 @@ namespace NActors {
void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0);
void ClearCounters();
ui64 GetCounter(ui32 evType) const;
- TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0);
- void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max());
- TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
- void BlockOutputForActor(const TActorId& actorId);
- IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
- void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
- bool IsScheduleForActorEnabled(const TActorId& actorId) const;
+ TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0);
+ void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max());
+ TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
+ void BlockOutputForActor(const TActorId& actorId);
+ IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
+ void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
+ bool IsScheduleForActorEnabled(const TActorId& actorId) const;
TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
void SetupMonitoring();
@@ -317,7 +317,7 @@ namespace NActors {
template<class TEvent>
typename TEvent::TPtr GrabEdgeEventIf(
- const TSet<TActorId>& edgeFilter,
+ const TSet<TActorId>& edgeFilter,
const std::function<bool(const typename TEvent::TPtr&)>& predicate,
TDuration simTimeout = TDuration::Max())
{
@@ -345,11 +345,11 @@ namespace NActors {
template<class TEvent>
typename TEvent::TPtr GrabEdgeEventIf(
- const TActorId& edgeActor,
+ const TActorId& edgeActor,
const std::function<bool(const typename TEvent::TPtr&)>& predicate,
TDuration simTimeout = TDuration::Max())
{
- TSet<TActorId> edgeFilter{edgeActor};
+ TSet<TActorId> edgeFilter{edgeActor};
return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout);
}
@@ -368,13 +368,13 @@ namespace NActors {
}
template<class TEvent>
- typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
+ typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
return GrabEdgeEventIf<TEvent>(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout);
}
template<class TEvent>
- typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
- TSet<TActorId> edgeFilter{edgeActor};
+ typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
+ TSet<TActorId> edgeFilter{edgeActor};
return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout);
}
@@ -409,7 +409,7 @@ namespace NActors {
}
template<class TEvent>
- typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
+ typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
try {
return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout);
} catch (...) {
@@ -418,7 +418,7 @@ namespace NActors {
}
template<class TEvent>
- typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
+ typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) {
try {
return GrabEdgeEvent<TEvent>(edgeActor, simTimeout);
} catch (...) {
@@ -462,7 +462,7 @@ namespace NActors {
}
void SetDispatcherRandomSeed(TInstant time, ui64 iteration);
- TString GetActorName(const TActorId& actorId) const;
+ TString GetActorName(const TActorId& actorId) const;
const TVector<ui64>& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; }
void SetTxAllocatorTabletIds(const TVector<ui64>& ids) { TxAllocatorTabletIds = ids; }
@@ -493,7 +493,7 @@ namespace NActors {
}
private:
- IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const;
+ IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const;
void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem);
TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint);
void ClearMailbox(ui32 nodeId, ui32 hint);
@@ -526,7 +526,7 @@ namespace NActors {
ui64 DispatchCyclesCount;
ui64 DispatchedEventsCount;
ui64 DispatchedEventsLimit = 2'500'000;
- TActorId CurrentRecipient;
+ TActorId CurrentRecipient;
ui64 DispatcherRandomSeed;
TIntrusivePtr<IRandomProvider> DispatcherRandomProvider;
TAutoPtr<TLogBackend> LogBackend;
@@ -559,9 +559,9 @@ namespace NActors {
TIntrusivePtr<NInterconnect::TPollerThreads> Poller;
volatile ui64* ActorSystemTimestamp;
volatile ui64* ActorSystemMonotonic;
- TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices;
- TMap<TActorId, IActor*> LocalServicesActors;
- TMap<IActor*, TActorId> ActorToActorId;
+ TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices;
+ TMap<TActorId, IActor*> LocalServicesActors;
+ TMap<IActor*, TActorId> ActorToActorId;
THolder<TMailboxTable> MailboxTable;
std::shared_ptr<void> AppData0;
THolder<TActorSystem> ActorSystem;
@@ -613,8 +613,8 @@ namespace NActors {
TProgramShouldContinue ShouldContinue;
TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes;
ui64 CurrentTimestamp;
- TSet<TActorId> EdgeActors;
- THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox;
+ TSet<TActorId> EdgeActors;
+ THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox;
TDuration DispatchTimeout;
TDuration ReschedulingDelay;
TEventObserver ObserverFunc;
@@ -622,10 +622,10 @@ namespace NActors {
TEventFilter EventFilterFunc;
TScheduledEventFilter ScheduledEventFilterFunc;
TRegistrationObserver RegistrationObserver;
- TSet<TActorId> BlockedOutput;
- TSet<TActorId> ScheduleWhiteList;
- THashMap<TActorId, TActorId> ScheduleWhiteListParent;
- THashMap<TActorId, TString> ActorNames;
+ TSet<TActorId> BlockedOutput;
+ TSet<TActorId> ScheduleWhiteList;
+ THashMap<TActorId, TActorId> ScheduleWhiteListParent;
+ THashMap<TActorId, TString> ActorNames;
TDispatchContext* CurrentDispatchContext;
TVector<ui64> TxAllocatorTabletIds;
@@ -686,7 +686,7 @@ namespace NActors {
class IStrandingDecoratorFactory {
public:
virtual ~IStrandingDecoratorFactory() {}
- virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0;
+ virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0;
};
struct IReplyChecker {