diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:03 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:03 +0300 |
commit | 2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/actors | |
parent | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff) | |
download | ydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors')
97 files changed, 871 insertions, 871 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index c84b8d7e2f..6f9ba6a42b 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -7,7 +7,7 @@ namespace NActors { Y_POD_THREAD(TActivationContext*) TlsActivationContext((TActivationContext*)nullptr); - bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { + bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { return Send(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); } @@ -15,7 +15,7 @@ namespace NActors { return ExecutorThread.Send(ev); } - void IActor::Registered(TActorSystem* sys, const TActorId& owner) { + void IActor::Registered(TActorSystem* sys, const TActorId& owner) { // fallback to legacy method, do not use it anymore if (auto eh = AfterRegister(SelfId(), owner)) sys->Send(eh); @@ -25,7 +25,7 @@ namespace NActors { SelfActorId.Out(out); } - bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept { + bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept { return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId)); } @@ -33,19 +33,19 @@ namespace NActors { return TlsActivationContext->ExecutorThread.Send(ev); } - void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { - TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); - } - + void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); + } + void TActivationContext::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); } void TActivationContext::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { - TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie); + TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie); } - bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { + bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { return TActivationContext::Send(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); } @@ -75,11 +75,11 @@ namespace NActors { return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId.Hint(), SelfActorId); } - TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) { + TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) { return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, parentId); } - TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) { + TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) { return TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(destinationNodeId); } @@ -95,36 +95,36 @@ namespace NActors { return NHPTimer::GetSeconds(GetCurrentEventTicks()); } - TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const { + TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const { return ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfID); } - TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept { + TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept { return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId); } - void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); - } - + void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const { + ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); + } + void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const { ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); } void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie); + ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie); + } + + void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { + TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } - void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); - } - void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie); + TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } TInstant TActivationContext::Now() { diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 8bfef6b5bd..ed29bd14b9 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -36,17 +36,17 @@ namespace NActors { public: static bool Send(TAutoPtr<IEventHandle> ev); - - /** - * Schedule one-shot event that will be send at given time point in the future. - * + + /** + * Schedule one-shot event that will be send at given time point in the future. + * * @param deadline the wallclock time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); - - /** + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + + /** * Schedule one-shot event that will be send at given time point in the future. * * @param deadline the monotonic time point in future when event must be send @@ -56,12 +56,12 @@ namespace NActors { static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); /** - * Schedule one-shot event that will be send after given delay. - * - * @param delta the time from now to delay event sending - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ + * Schedule one-shot event that will be send after given delay. + * + * @param delta the time from now to delay event sending + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); static TInstant Now(); @@ -79,9 +79,9 @@ namespace NActors { static TActorId RegisterWithSameMailbox(IActor* actor, TActorId parentId); static const TActorContext& AsActorContext(); - static TActorContext ActorContextFor(TActorId id); + static TActorContext ActorContextFor(TActorId id); - static TActorId InterconnectProxy(ui32 nodeid); + static TActorId InterconnectProxy(ui32 nodeid); static TActorSystem* ActorSystem(); static i64 GetCurrentEventTicks(); @@ -89,34 +89,34 @@ namespace NActors { }; struct TActorContext: public TActivationContext { - const TActorId SelfID; + const TActorId SelfID; - explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID) + explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID) : TActivationContext(mailbox, executorThread, eventStart) , SelfID(selfID) { } - bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; template <typename TEvent> - bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { + bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId)); } bool Send(TAutoPtr<IEventHandle> ev) const; - + TInstant Now() const; TMonotonic Monotonic() const; - /** - * Schedule one-shot event that will be send at given time point in the future. - * + /** + * Schedule one-shot event that will be send at given time point in the future. + * * @param deadline the wallclock time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; - - /** + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; + + /** * Schedule one-shot event that will be send at given time point in the future. * * @param deadline the monotonic time point in future when event must be send @@ -126,20 +126,20 @@ namespace NActors { void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; /** - * Schedule one-shot event that will be send after given delay. - * - * @param delta the time from now to delay event sending - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ + * Schedule one-shot event that will be send after given delay. + * + * @param delta the time from now to delay event sending + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; - - TActorContext MakeFor(const TActorId& otherId) const { + + TActorContext MakeFor(const TActorId& otherId) const { return TActorContext(Mailbox, ExecutorThread, EventStart, otherId); } // register new actor in ActorSystem on new fresh mailbox. - TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const; + TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const; // Register new actor in ActorSystem on same _mailbox_ as current actor. // There is one thread per mailbox to execute actor, which mean @@ -153,17 +153,17 @@ namespace NActors { extern Y_POD_THREAD(TActivationContext*) TlsActivationContext; - struct TActorIdentity: public TActorId { - explicit TActorIdentity(TActorId actorId) - : TActorId(actorId) + struct TActorIdentity: public TActorId { + explicit TActorIdentity(TActorId actorId) + : TActorId(actorId) { } - void operator=(TActorId actorId) { + void operator=(TActorId actorId) { *this = TActorIdentity(actorId); } - bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; @@ -174,18 +174,18 @@ namespace NActors { class IActorOps : TNonCopyable { public: virtual void Describe(IOutputStream&) const noexcept = 0; - virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0; - - /** - * Schedule one-shot event that will be send at given time point in the future. - * + virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0; + + /** + * Schedule one-shot event that will be send at given time point in the future. + * * @param deadline the wallclock time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; - - /** + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; + + /** * Schedule one-shot event that will be send at given time point in the future. * * @param deadline the monotonic time point in future when event must be send @@ -195,15 +195,15 @@ namespace NActors { virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; /** - * Schedule one-shot event that will be send after given delay. - * - * @param delta the time from now to delay event sending - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; - - virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0; + * Schedule one-shot event that will be send after given delay. + * + * @param delta the time from now to delay event sending + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; + + virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0; virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0; }; @@ -219,7 +219,7 @@ namespace NActors { i64 ElapsedTicks; ui64 HandledEvents; - friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&); + friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&); friend class TDecorator; public: @@ -254,7 +254,7 @@ namespace NActors { protected: IActor(TReceiveFunc stateFunc, ui32 activityType = OTHER) : StateFunc(stateFunc) - , SelfActorId(TActorId()) + , SelfActorId(TActorId()) , ElapsedTicks(0) , HandledEvents(0) , ActivityType(activityType) @@ -310,7 +310,7 @@ namespace NActors { InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) { struct TRecurseContext : TActorContext { TActivationContext *Prev; - TRecurseContext(const TActorId& actorId) + TRecurseContext(const TActorId& actorId) : TActorContext(TActivationContext::ActorContextFor(actorId)) , Prev(TlsActivationContext) { @@ -323,9 +323,9 @@ namespace NActors { return (actor.*method)(std::forward<TArgs>(args)...); } - virtual void Registered(TActorSystem* sys, const TActorId& owner); + virtual void Registered(TActorSystem* sys, const TActorId& owner); - virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) { + virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) { Y_UNUSED(self); Y_UNUSED(parentId); return TAutoPtr<IEventHandle>(); @@ -350,23 +350,23 @@ namespace NActors { protected: void Describe(IOutputStream&) const noexcept override; - bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; template <typename TEvent> - bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{ + bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{ return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId)); } - + template <class TEvent, class ... TEventArgs> bool Send(TActorId recipient, TEventArgs&& ... args) const { return Send(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...)); } - void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; + void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; // register new actor in ActorSystem on new fresh mailbox. - TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final; + TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final; // Register new actor in ActorSystem on same _mailbox_ as current actor. // There is one thread per mailbox to execute actor, which mean @@ -454,7 +454,7 @@ namespace NActors { return *static_cast<TActorContext*>(tls); } - inline TActorContext TActivationContext::ActorContextFor(TActorId id) { + inline TActorContext TActivationContext::ActorContextFor(TActorId id) { auto& tls = *TlsActivationContext; return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id); } diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h index 25e6b3fbe2..a37887c939 100644 --- a/library/cpp/actors/core/actor_bootstrapped.h +++ b/library/cpp/actors/core/actor_bootstrapped.h @@ -9,7 +9,7 @@ namespace NActors { template<typename TDerived> class TActorBootstrapped : public TActor<TDerived> { protected: - TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0); } @@ -19,11 +19,11 @@ namespace NActors { TDerived& self = static_cast<TDerived&>(*this); if constexpr (std::is_invocable_v<T, TDerived, const TActorContext&>) { self.Bootstrap(ctx); - } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&, const TActorContext&>) { + } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&, const TActorContext&>) { self.Bootstrap(ev->Sender, ctx); } else if constexpr (std::is_invocable_v<T, TDerived>) { self.Bootstrap(); - } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&>) { + } else if constexpr (std::is_invocable_v<T, TDerived, const TActorId&>) { self.Bootstrap(ev->Sender); } else { static_assert(dependent_false<TDerived>::value, "No correct Bootstrap() signature"); diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index af7328d3a1..6bcb768eaf 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -24,8 +24,8 @@ namespace NActors { TActorContext *ActorContext = nullptr; protected: - TActorIdentity SelfActorId = TActorIdentity(TActorId()); - TActorId ParentActorId; + TActorIdentity SelfActorId = TActorIdentity(TActorId()); + TActorId ParentActorId; private: template <typename TFirstEvent, typename... TOtherEvents> @@ -107,12 +107,12 @@ namespace NActors { TActorSystem *GetActorSystem() const { return GetActorContext().ExecutorThread.ActorSystem; } TInstant Now() const { return GetActorContext().Now(); } - bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { return GetActorContext().Send(recipient, ev, flags, cookie, std::move(traceId)); } template <typename TEvent> - bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { + bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { return GetActorContext().Send(recipient, ev.Release(), flags, cookie, std::move(traceId)); } @@ -130,7 +130,7 @@ namespace NActors { return GetActorContext().Schedule(deadline, ev, cookie); } - TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) { + TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) { return GetActorContext().Register(actor, mailboxType, poolId); } @@ -159,7 +159,7 @@ namespace NActors { , Impl(std::move(impl)) {} - TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override { + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override { return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0); } diff --git a/library/cpp/actors/core/actor_coroutine_ut.cpp b/library/cpp/actors/core/actor_coroutine_ut.cpp index 13bfdcc2bf..951512b877 100644 --- a/library/cpp/actors/core/actor_coroutine_ut.cpp +++ b/library/cpp/actors/core/actor_coroutine_ut.cpp @@ -29,7 +29,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) { }; class TBasicResponderActor: public TActorBootstrapped<TBasicResponderActor> { - TDeque<TActorId> RespondTo; + TDeque<TActorId> RespondTo; public: TBasicResponderActor() { @@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) { } void Run() override { - TActorId child = GetActorContext().Register(new TBasicResponderActor); + TActorId child = GetActorContext().Register(new TBasicResponderActor); ui32 itemsProcessed = 0; try { while (!Finish) { diff --git a/library/cpp/actors/core/actorid.cpp b/library/cpp/actors/core/actorid.cpp index 3330cd0ab3..ccda035eac 100644 --- a/library/cpp/actors/core/actorid.cpp +++ b/library/cpp/actors/core/actorid.cpp @@ -3,18 +3,18 @@ #include <util/string/cast.h> namespace NActors { - void TActorId::Out(IOutputStream& o) const { + void TActorId::Out(IOutputStream& o) const { o << "[" << NodeId() << ":" << LocalId() << ":" << Hint() << "]"; } - TString TActorId::ToString() const { + TString TActorId::ToString() const { TString x; TStringOutput o(x); Out(o); return x; } - bool TActorId::Parse(const char* buf, ui32 sz) { + bool TActorId::Parse(const char* buf, ui32 sz) { if (sz < 4 || buf[0] != '[' || buf[sz - 1] != ']') return false; diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h index c631ef3a72..d972b1a0ff 100644 --- a/library/cpp/actors/core/actorid.h +++ b/library/cpp/actors/core/actorid.h @@ -11,7 +11,7 @@ namespace NActors { // next 11 bits of node-id - pool id // next 20 bits - node id itself - struct TActorId { + struct TActorId { static constexpr ui32 MaxServiceIDLength = 12; static constexpr ui32 MaxPoolID = 0x000007FF; static constexpr ui32 MaxNodeId = 0x000FFFFF; @@ -37,19 +37,19 @@ namespace NActors { } Raw; public: - TActorId() noexcept { + TActorId() noexcept { Raw.X.X1 = 0; Raw.X.X2 = 0; } - explicit TActorId(ui32 nodeId, ui32 poolId, ui64 localId, ui32 hint) noexcept { + explicit TActorId(ui32 nodeId, ui32 poolId, ui64 localId, ui32 hint) noexcept { Y_VERIFY_DEBUG(poolId <= MaxPoolID); Raw.N.LocalId = localId; Raw.N.Hint = hint; Raw.N.NodeId = nodeId | (poolId << PoolIndexShift); } - explicit TActorId(ui32 nodeId, const TStringBuf& x) noexcept { + explicit TActorId(ui32 nodeId, const TStringBuf& x) noexcept { Y_VERIFY(x.size() <= MaxServiceIDLength, "service id is too long"); Raw.N.LocalId = 0; Raw.N.Hint = 0; @@ -57,7 +57,7 @@ namespace NActors { memcpy(Raw.Buf, x.data(), x.size()); } - explicit TActorId(ui64 x1, ui64 x2) noexcept { + explicit TActorId(ui64 x1, ui64 x2) noexcept { Raw.X.X1 = x1; Raw.X.X2 = x2; } @@ -103,7 +103,7 @@ namespace NActors { return Raw.X.X2; } - bool operator<(const TActorId& x) const noexcept { + bool operator<(const TActorId& x) const noexcept { const ui64 s1 = Raw.X.X1; const ui64 s2 = Raw.X.X2; const ui64 x1 = x.Raw.X.X1; @@ -112,11 +112,11 @@ namespace NActors { return (s1 != x1) ? (s1 < x1) : (s2 < x2); } - bool operator!=(const TActorId& x) const noexcept { + bool operator!=(const TActorId& x) const noexcept { return Raw.X.X1 != x.Raw.X.X1 || Raw.X.X2 != x.Raw.X.X2; } - bool operator==(const TActorId& x) const noexcept { + bool operator==(const TActorId& x) const noexcept { return !(x != *this); } @@ -153,19 +153,19 @@ namespace NActors { } struct THash { - ui64 operator()(const TActorId& actorId) const noexcept { - return actorId.Hash(); + ui64 operator()(const TActorId& actorId) const noexcept { + return actorId.Hash(); } }; struct THash32 { - ui64 operator()(const TActorId& actorId) const noexcept { - return actorId.Hash(); + ui64 operator()(const TActorId& actorId) const noexcept { + return actorId.Hash(); } }; struct TOrderedCmp { - bool operator()(const TActorId &left, const TActorId &right) const noexcept { + bool operator()(const TActorId &left, const TActorId &right) const noexcept { Y_VERIFY_DEBUG(!left.IsService() && !right.IsService(), "ordered compare works for plain actorids only"); const ui32 n1 = left.NodeId(); const ui32 n2 = right.NodeId(); @@ -179,18 +179,18 @@ namespace NActors { bool Parse(const char* buf, ui32 sz); }; - static_assert(sizeof(TActorId) == 16, "expect sizeof(TActorId) == 16"); + static_assert(sizeof(TActorId) == 16, "expect sizeof(TActorId) == 16"); static_assert(MaxPools < TActorId::MaxPoolID); // current implementation of united pool has limit MaxPools on pool id } template <> -inline void Out<NActors::TActorId>(IOutputStream& o, const NActors::TActorId& x) { +inline void Out<NActors::TActorId>(IOutputStream& o, const NActors::TActorId& x) { return x.Out(o); } template <> -struct THash<NActors::TActorId> { - inline ui64 operator()(const NActors::TActorId& x) const { +struct THash<NActors::TActorId> { + inline ui64 operator()(const NActors::TActorId& x) const { return x.Hash(); } }; diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index 488abd2963..c58698a206 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -21,16 +21,16 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); struct TActorSystem::TServiceMap : TNonCopyable { - NActors::TServiceMap<TActorId, TActorId, TActorId::THash> LocalMap; + NActors::TServiceMap<TActorId, TActorId, TActorId::THash> LocalMap; TTicketLock Lock; - TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) { + TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) { TTicketLock::TGuard guard(&Lock); - const TActorId old = LocalMap.Update(serviceId, actorId); + const TActorId old = LocalMap.Update(serviceId, actorId); return old; } - TActorId LookupLocal(const TActorId& x) { + TActorId LookupLocal(const TActorId& x) { return LocalMap.Find(x); } }; @@ -68,7 +68,7 @@ namespace NActors { ev->Callstack.TraceIfEmpty(); #endif - TActorId recipient = ev->GetRecipientRewrite(); + TActorId recipient = ev->GetRecipientRewrite(); const ui32 recpNodeId = recipient.NodeId(); if (recpNodeId != NodeId && recpNodeId != 0) { @@ -114,13 +114,13 @@ namespace NActors { return false; } - bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const { + bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const { return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags)); } - void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { + void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { Schedule(deadline - Timestamp(), ev, cookie); - } + } void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { const auto current = Monotonic(); @@ -131,22 +131,22 @@ namespace NActors { ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); } - void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { + void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { const auto deadline = Monotonic() + delta; - - TTicketLock::TGuard guard(&ScheduleLock); - ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); + + TTicketLock::TGuard guard(&ScheduleLock); + ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); } - TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter, - const TActorId& parentId) { + TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter, + const TActorId& parentId) { Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32, (ui32)executorPool, (ui32)ExecutorPoolCount); return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId); } NThreading::TFuture<THolder<IEventBase>> TActorSystem::AskGeneric(TMaybe<ui32> expectedEventType, - TActorId recipient, THolder<IEventBase> event, + TActorId recipient, THolder<IEventBase> event, TDuration timeout) { auto promise = NThreading::NewPromise<THolder<IEventBase>>(); Register(MakeAskActor(expectedEventType, recipient, std::move(event), timeout, promise).Release()); @@ -173,16 +173,16 @@ namespace NActors { return ret; } - TActorId TActorSystem::InterconnectProxy(ui32 destinationNode) const { + TActorId TActorSystem::InterconnectProxy(ui32 destinationNode) const { if (destinationNode < InterconnectCount) return Interconnect[destinationNode]; else if (destinationNode != NodeId) return MakeInterconnectProxyId(destinationNode); else - return TActorId(); + return TActorId(); } - ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) { + ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) { // TODO: get rid of this method for (ui32 i = 0; i < InterconnectCount; ++i) { Send(eventFabric(Interconnect[i])); @@ -194,9 +194,9 @@ namespace NActors { return ServiceMap->LookupLocal(x); } - TActorId TActorSystem::RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) { - // TODO: notify old actor about demotion - return ServiceMap->RegisterLocalService(serviceId, actorId); + TActorId TActorSystem::RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) { + // TODO: notify old actor about demotion + return ServiceMap->RegisterLocalService(serviceId, actorId); } void TActorSystem::GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { @@ -217,7 +217,7 @@ namespace NActors { // setup interconnect proxies { const TInterconnectSetup& setup = SystemSetup->Interconnect; - Interconnect.Reset(new TActorId[InterconnectCount + 1]); + Interconnect.Reset(new TActorId[InterconnectCount + 1]); for (ui32 i = 0, e = InterconnectCount; i != e; ++i) { const TActorSetupCmd& x = setup.ProxyActors[i]; if (x.Actor) { @@ -231,8 +231,8 @@ namespace NActors { // setup local services { for (ui32 i = 0, e = (ui32)SystemSetup->LocalServices.size(); i != e; ++i) { - const std::pair<TActorId, TActorSetupCmd>& x = SystemSetup->LocalServices[i]; - const TActorId xid = Register(x.second.Actor, x.second.MailboxType, x.second.PoolId, i); + const std::pair<TActorId, TActorSetupCmd>& x = SystemSetup->LocalServices[i]; + const TActorId xid = Register(x.second.Actor, x.second.MailboxType, x.second.PoolId, i); Y_VERIFY(!!xid); if (!!x.first) RegisterLocalService(x.first, xid); diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index c2c88ed2ec..40499d7586 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -66,17 +66,17 @@ namespace NActors { virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0; virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0; - /** - * Schedule one-shot event that will be send at given time point in the future. - * + /** + * Schedule one-shot event that will be send at given time point in the future. + * * @param deadline the wallclock time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event * @param workerId index of thread which will perform event dispatching - */ + */ virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; - - /** + + /** * Schedule one-shot event that will be send at given time point in the future. * * @param deadline the monotonic time point in future when event must be send @@ -87,21 +87,21 @@ namespace NActors { virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; /** - * Schedule one-shot event that will be send after given delay. - * - * @param delta the time from now to delay event sending - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event + * Schedule one-shot event that will be send after given delay. + * + * @param delta the time from now to delay event sending + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event * @param workerId index of thread which will perform event dispatching - */ + */ virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; - + // for actorsystem virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0; virtual void ScheduleActivation(ui32 activation) = 0; virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0; - virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0; - virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0; + virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0; + virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0; // lifecycle stuff virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0; @@ -223,7 +223,7 @@ namespace NActors { THolder<TServiceMap> ServiceMap; const ui32 InterconnectCount; - TArrayHolder<TActorId> Interconnect; + TArrayHolder<TActorId> Interconnect; volatile ui64 CurrentTimestamp; volatile ui64 CurrentMonotonic; @@ -235,7 +235,7 @@ namespace NActors { friend class TExecutorThread; THolder<TActorSystemSetup> SystemSetup; - TActorId DefSelfID; + TActorId DefSelfID; void* AppData0; TIntrusivePtr<NLog::TSettings> LoggerSettings0; TProxyWrapperFactory ProxyWrapperFactory; @@ -255,22 +255,22 @@ namespace NActors { void Stop(); void Cleanup(); - TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0, - ui64 revolvingCounter = 0, const TActorId& parentId = TActorId()); + TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0, + ui64 revolvingCounter = 0, const TActorId& parentId = TActorId()); bool Send(TAutoPtr<IEventHandle> ev) const; - bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0) const; + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0) const; /** - * Schedule one-shot event that will be send at given time point in the future. - * + * Schedule one-shot event that will be send at given time point in the future. + * * @param deadline the wallclock time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; - - /** + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; + + /** * Schedule one-shot event that will be send at given time point in the future. * * @param deadline the monotonic time point in future when event must be send @@ -280,15 +280,15 @@ namespace NActors { void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; /** - * Schedule one-shot event that will be send after given delay. - * - * @param delta the time from now to delay event sending - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; - - /** + * Schedule one-shot event that will be send after given delay. + * + * @param delta the time from now to delay event sending + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; + + /** * A way to interact with actors from non-actor context. * * This method will send the `event` to the `recipient` and then will wait for a response. When response arrives, @@ -303,7 +303,7 @@ namespace NActors { */ template <typename T> [[nodiscard]] - NThreading::TFuture<THolder<T>> Ask(TActorId recipient, THolder<IEventBase> event, TDuration timeout = TDuration::Max()) { + NThreading::TFuture<THolder<T>> Ask(TActorId recipient, THolder<IEventBase> event, TDuration timeout = TDuration::Max()) { if constexpr (std::is_same_v<T, IEventBase>) { return AskGeneric(Nothing(), recipient, std::move(event), timeout); } else { @@ -317,20 +317,20 @@ namespace NActors { [[nodiscard]] NThreading::TFuture<THolder<IEventBase>> AskGeneric( TMaybe<ui32> expectedEventType, - TActorId recipient, + TActorId recipient, THolder<IEventBase> event, TDuration timeout); ui64 AllocateIDSpace(ui64 count); - TActorId InterconnectProxy(ui32 destinationNode) const; - ui32 BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>&); + TActorId InterconnectProxy(ui32 destinationNode) const; + ui32 BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>&); void UpdateLinkStatus(ui8 status, ui32 destinationNode); ui8 LinkStatus(ui32 destinationNode); TActorId LookupLocalService(const TActorId& x) const; - TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId); + TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId); ui32 GetMaxActivityType() const { return SystemSetup ? SystemSetup->MaxActivityType : 1; diff --git a/library/cpp/actors/core/actorsystem_ut.cpp b/library/cpp/actors/core/actorsystem_ut.cpp index 2c93b12dcd..231d6f0ca1 100644 --- a/library/cpp/actors/core/actorsystem_ut.cpp +++ b/library/cpp/actors/core/actorsystem_ut.cpp @@ -1,45 +1,45 @@ -#include "actorsystem.h" - -#include <library/cpp/actors/testlib/test_runtime.h> -#include <library/cpp/testing/unittest/registar.h> - -using namespace NActors; - -Y_UNIT_TEST_SUITE(TActorSystemTest) { - - class TTestActor: public TActor<TTestActor> { - public: - TTestActor() - : TActor{&TThis::Main} - { - } - - STATEFN(Main) { - Y_UNUSED(ev); - } - }; - - THolder<TTestActorRuntimeBase> CreateRuntime() { - auto runtime = MakeHolder<TTestActorRuntimeBase>(); - runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; }); - runtime->Initialize(); - return runtime; - } - - Y_UNIT_TEST(LocalService) { - THolder<TTestActorRuntimeBase> runtime = CreateRuntime(); - auto actorA = runtime->Register(new TTestActor); - auto actorB = runtime->Register(new TTestActor); - - TActorId myServiceId{0, TStringBuf{"my-service"}}; - - auto prevActorId = runtime->RegisterService(myServiceId, actorA); - UNIT_ASSERT(!prevActorId); - UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorA); - - prevActorId = runtime->RegisterService(myServiceId, actorB); - UNIT_ASSERT(prevActorId); - UNIT_ASSERT_EQUAL(prevActorId, actorA); - UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorB); - } -} +#include "actorsystem.h" + +#include <library/cpp/actors/testlib/test_runtime.h> +#include <library/cpp/testing/unittest/registar.h> + +using namespace NActors; + +Y_UNIT_TEST_SUITE(TActorSystemTest) { + + class TTestActor: public TActor<TTestActor> { + public: + TTestActor() + : TActor{&TThis::Main} + { + } + + STATEFN(Main) { + Y_UNUSED(ev); + } + }; + + THolder<TTestActorRuntimeBase> CreateRuntime() { + auto runtime = MakeHolder<TTestActorRuntimeBase>(); + runtime->SetScheduledEventFilter([](auto&&, auto&&, auto&&, auto&&) { return false; }); + runtime->Initialize(); + return runtime; + } + + Y_UNIT_TEST(LocalService) { + THolder<TTestActorRuntimeBase> runtime = CreateRuntime(); + auto actorA = runtime->Register(new TTestActor); + auto actorB = runtime->Register(new TTestActor); + + TActorId myServiceId{0, TStringBuf{"my-service"}}; + + auto prevActorId = runtime->RegisterService(myServiceId, actorA); + UNIT_ASSERT(!prevActorId); + UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorA); + + prevActorId = runtime->RegisterService(myServiceId, actorB); + UNIT_ASSERT(prevActorId); + UNIT_ASSERT_EQUAL(prevActorId, actorA); + UNIT_ASSERT_EQUAL(runtime->GetLocalServiceId(myServiceId), actorB); + } +} diff --git a/library/cpp/actors/core/ask.cpp b/library/cpp/actors/core/ask.cpp index 4e4d6a1a2b..0054c9a906 100644 --- a/library/cpp/actors/core/ask.cpp +++ b/library/cpp/actors/core/ask.cpp @@ -19,7 +19,7 @@ namespace NActors { public: TAskActor( TMaybe<ui32> expectedEventType, - TActorId recipient, + TActorId recipient, THolder<IEventBase> event, TDuration timeout, const NThreading::TPromise<THolder<IEventBase>>& promise) @@ -55,7 +55,7 @@ namespace NActors { public: TMaybe<ui32> ExpectedEventType_; - TActorId Recipient_; + TActorId Recipient_; THolder<IEventBase> Event_; TDuration Timeout_; NThreading::TPromise<THolder<IEventBase>> Promise_; @@ -64,7 +64,7 @@ namespace NActors { THolder<IActor> MakeAskActor( TMaybe<ui32> expectedEventType, - TActorId recipient, + TActorId recipient, THolder<IEventBase> event, TDuration timeout, const NThreading::TPromise<THolder<IEventBase>>& promise) diff --git a/library/cpp/actors/core/ask.h b/library/cpp/actors/core/ask.h index b935fac564..036f1833a4 100644 --- a/library/cpp/actors/core/ask.h +++ b/library/cpp/actors/core/ask.h @@ -11,7 +11,7 @@ namespace NActors { */ THolder<IActor> MakeAskActor( TMaybe<ui32> expectedEventType, - TActorId recipient, + TActorId recipient, THolder<IEventBase> event, TDuration timeout, const NThreading::TPromise<THolder<IEventBase>>& promise); diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 9519978bc1..6ff02aaf94 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -48,9 +48,9 @@ namespace NActors { // fat handle class IEventHandle : TNonCopyable { struct TOnNondelivery { - TActorId Recipient; + TActorId Recipient; - TOnNondelivery(const TActorId& recipient) + TOnNondelivery(const TActorId& recipient) : Recipient(recipient) { } @@ -99,8 +99,8 @@ namespace NActors { const ui32 Type; const ui32 Flags; - const TActorId Recipient; - const TActorId Sender; + const TActorId Recipient; + const TActorId Sender; const ui64 Cookie; const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect @@ -108,7 +108,7 @@ namespace NActors { NWilson::TTraceId TraceId; // filled if feeded by interconnect session - const TActorId InterconnectSession; + const TActorId InterconnectSession; #ifdef ACTORSLIB_COLLECT_EXEC_STATS ::NHPTimer::STime SendTime; @@ -138,13 +138,13 @@ namespace NActors { THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; - TActorId RewriteRecipient; + TActorId RewriteRecipient; ui32 RewriteType; THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events public: - void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) { + void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) { RewriteRecipient = recipientRewrite; RewriteType = typeRewrite; } @@ -154,7 +154,7 @@ namespace NActors { RewriteType = Type; } - const TActorId& GetRecipientRewrite() const { + const TActorId& GetRecipientRewrite() const { return RewriteRecipient; } @@ -162,12 +162,12 @@ namespace NActors { return RewriteType; } - TActorId GetForwardOnNondeliveryRecipient() const { - return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId(); + TActorId GetForwardOnNondeliveryRecipient() const { + return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId(); } - IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, - const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) + IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, + const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) : Type(ev->Type()) , Flags(flags) , Recipient(recipient) @@ -187,11 +187,11 @@ namespace NActors { IEventHandle(ui32 type, ui32 flags, - const TActorId& recipient, - const TActorId& sender, + const TActorId& recipient, + const TActorId& sender, TIntrusivePtr<TEventSerializedData> buffer, ui64 cookie, - const TActorId* forwardOnNondelivery = nullptr, + const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) : Type(type) , Flags(flags) @@ -211,11 +211,11 @@ namespace NActors { } // Special ctor for events from interconnect. - IEventHandle(const TActorId& session, + IEventHandle(const TActorId& session, ui32 type, ui32 flags, - const TActorId& recipient, - const TActorId& sender, + const TActorId& recipient, + const TActorId& sender, TIntrusivePtr<TEventSerializedData> buffer, ui64 cookie, TScopeId originScopeId, @@ -276,7 +276,7 @@ namespace NActors { return x; } - TAutoPtr<IEventHandle> Forward(const TActorId& dest) { + TAutoPtr<IEventHandle> Forward(const TActorId& dest) { if (Event) return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId)); else diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index baaf333ca9..d7546b901a 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -488,11 +488,11 @@ namespace NActors { } }; - inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) { - return TActorId(actorId.GetRawX1(), actorId.GetRawX2()); + inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) { + return TActorId(actorId.GetRawX1(), actorId.GetRawX2()); } - inline void ActorIdToProto(const TActorId& src, NActorsProto::TActorId* dest) { + inline void ActorIdToProto(const TActorId& src, NActorsProto::TActorId* dest) { Y_VERIFY_DEBUG(dest); dest->SetRawX1(src.RawX1()); dest->SetRawX2(src.RawX2()); diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h index b5b9d7c9fa..702cf50fad 100644 --- a/library/cpp/actors/core/events.h +++ b/library/cpp/actors/core/events.h @@ -20,7 +20,7 @@ namespace NActors { ES_INTERCONNECT_TCP = 8, ES_PROFILER = 9, ES_YF = 10, - ES_HTTP = 11, + ES_HTTP = 11, ES_USERSPACE = 4096, @@ -99,7 +99,7 @@ namespace NActors { InvokeQuery, End, - // Compatibility section + // Compatibility section PoisonPill = Poison, ActorDied = Gone, }; @@ -191,17 +191,17 @@ namespace NActors { struct TEvCallbackException: public TEventPB<TEvCallbackException, NActorsProto::TCallbackException, TSystem::CallbackException> { - TEvCallbackException(const TActorId& id, const TString& msg) { - ActorIdToProto(id, Record.MutableActorId()); + TEvCallbackException(const TActorId& id, const TString& msg) { + ActorIdToProto(id, Record.MutableActorId()); Record.SetExceptionMessage(msg); } }; struct TEvCallbackCompletion: public TEventPB<TEvCallbackCompletion, - NActorsProto::TActorId, + NActorsProto::TActorId, TSystem::CallbackCompletion> { - TEvCallbackCompletion(const TActorId& id) { - ActorIdToProto(id, &Record); + TEvCallbackCompletion(const TActorId& id) { + ActorIdToProto(id, &Record); } }; diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp index 2a5a0b1cc6..23deaffd10 100644 --- a/library/cpp/actors/core/events_undelivered.cpp +++ b/library/cpp/actors/core/events_undelivered.cpp @@ -41,7 +41,7 @@ namespace NActors { TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(ui32 reason, bool unsure) { if (Flags & FlagForwardOnNondelivery) { const ui32 updatedFlags = Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession); - const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId(); + const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId(); if (Event) return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, TraceId.Clone()); diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h index ec55c43b40..e7a13c1005 100644 --- a/library/cpp/actors/core/executelater.h +++ b/library/cpp/actors/core/executelater.h @@ -17,8 +17,8 @@ namespace NActors { IActor::EActivityType activityType, ui32 channel = 0, ui64 cookie = 0, - const TActorId& reportCompletionTo = TActorId(), - const TActorId& reportExceptionTo = TActorId()) noexcept + const TActorId& reportCompletionTo = TActorId(), + const TActorId& reportExceptionTo = TActorId()) noexcept : Callback(std::move(callback)) , Channel(channel) , Cookie(cookie) @@ -65,8 +65,8 @@ namespace NActors { TCallback Callback; const ui32 Channel; const ui64 Cookie; - const TActorId ReportCompletionTo; - const TActorId ReportExceptionTo; + const TActorId ReportCompletionTo; + const TActorId ReportExceptionTo; }; template <typename T> @@ -75,8 +75,8 @@ namespace NActors { IActor::EActivityType activityType, ui32 channel = 0, ui64 cookie = 0, - const TActorId& reportCompletionTo = TActorId(), - const TActorId& reportExceptionTo = TActorId()) noexcept { + const TActorId& reportCompletionTo = TActorId(), + const TActorId& reportExceptionTo = TActorId()) noexcept { return new TExecuteLater<T>(std::forward<T>(func), activityType, channel, diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 860496f108..c3b9999168 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -7,7 +7,7 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) { + void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) { actor->SelfActorId = self; actor->Registered(sys, owner); } @@ -97,7 +97,7 @@ namespace NActors { mailbox->AttachActor(localActorId, actor); // do init - const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); + const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); DoActorInit(ActorSystem, actor, actorId, parentId); // Once we unlock the mailbox the actor starts running and we cannot use the pointer any more @@ -144,7 +144,7 @@ namespace NActors { const ui64 localActorId = AllocateID(); mailbox->AttachActor(localActorId, actor); - const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); + const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); DoActorInit(ActorSystem, actor, actorId, parentId); NHPTimer::STime elapsed = GetCycleCountFast() - hpstart; if (elapsed > 1000000) { diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index d52a242fc6..c84ce1af77 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -24,8 +24,8 @@ namespace NActors { ~TExecutorPoolBaseMailboxed(); void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingWriteCounter) override; bool Send(TAutoPtr<IEventHandle>& ev) override; - TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override; - TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override; + TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override; + TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override; bool Cleanup() override; }; diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 936d2e94a7..4dce16939a 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -315,11 +315,11 @@ namespace NActors { void TBasicExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { Y_VERIFY_DEBUG(workerId < PoolThreads); - + const auto deadline = ActorSystem->Monotonic() + delta; ScheduleWriters[workerId].Push(deadline.MicroSeconds(), ev.Release(), cookie); - } - + } + void TBasicExecutorPool::SetRealTimeMode() const { // TODO: musl-libc version of `sched_param` struct is for some reason different from pthread // version in Ubuntu 12.04 diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index dd83c85c74..023190f7fe 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -6,7 +6,7 @@ #include "executor_pool_base.h" #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/actors/util/threadparkpad.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <util/system/mutex.h> @@ -87,7 +87,7 @@ namespace NActors { void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; - + void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override; void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 8c170c2d84..76dff693af 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -25,7 +25,7 @@ private: private: TAtomic Counter; - TActorId Receiver; + TActorId Receiver; std::function<void(void)> Action; @@ -36,7 +36,7 @@ public: , Action(action) {} - void Start(TActorId receiver, size_t count) + void Start(TActorId receiver, size_t count) { AtomicSet(Counter, count); Receiver = receiver; @@ -102,7 +102,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { executorPool->SetThreadCount(halfSize); TTestSenderActor* actors[size]; - TActorId actorIds[size]; + TActorId actorIds[size]; for (size_t i = 0; i < size; ++i) { actors[i] = new TTestSenderActor(); actorIds[i] = actorSystem.Register(actors[i]); @@ -176,7 +176,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { executorPool->SetThreadCount(halfSize); TTestSenderActor* actors[size]; - TActorId actorIds[size]; + TActorId actorIds[size]; for (size_t i = 0; i < size; ++i) { actors[i] = new TTestSenderActor(); actorIds[i] = actorSystem.Register(actors[i]); @@ -201,7 +201,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { counter = 0; } }); - TActorId changerActorId = actorSystem.Register(changerActor); + TActorId changerActorId = actorSystem.Register(changerActor); changerActor->Start(changerActorId, msgCount); actorSystem.Send(changerActorId, new TEvMsg()); @@ -260,7 +260,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { auto begin = TInstant::Now(); TTestSenderActor* actors[size]; - TActorId actorIds[size]; + TActorId actorIds[size]; for (size_t i = 0; i < size; ++i) { actors[i] = new TTestSenderActor(); @@ -304,7 +304,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { auto begin = TInstant::Now(); TTestSenderActor* actors[actorsCount]; - TActorId actorIds[actorsCount]; + TActorId actorIds[actorsCount]; for (size_t i = 0; i < actorsCount; ++i) { actors[i] = new TTestSenderActor(); @@ -348,7 +348,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { auto begin = TInstant::Now(); TTestSenderActor* actors[actorsCount]; - TActorId actorIds[actorsCount]; + TActorId actorIds[actorsCount]; for (size_t i = 0; i < actorsCount; ++i) { actors[i] = new TTestSenderActor(); diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp index 025b5a22c2..fb557ae6b0 100644 --- a/library/cpp/actors/core/executor_pool_io.cpp +++ b/library/cpp/actors/core/executor_pool_io.cpp @@ -81,11 +81,11 @@ namespace NActors { void TIOExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { Y_UNUSED(workerId); const auto deadline = ActorSystem->Monotonic() + delta; - - TTicketLock::TGuard guard(&ScheduleLock); - ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); - } - + + TTicketLock::TGuard guard(&ScheduleLock); + ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); + } + void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) { Activations.Push(activation, revolvingWriteCounter); const TAtomic x = AtomicIncrement(Semaphore); diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h index a1359ba4ab..e576d642a1 100644 --- a/library/cpp/actors/core/executor_pool_io.h +++ b/library/cpp/actors/core/executor_pool_io.h @@ -35,7 +35,7 @@ namespace NActors { void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; - + void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override; void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index da4934eccd..dac6245635 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -1255,13 +1255,13 @@ namespace NActors { inline bool TUnitedWorkers::NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) { return Pools[pool].NextExecution(activation, revolvingCounter); } - + inline void TUnitedWorkers::StopExecution(TPoolId pool) { if (Pools[pool].StopExecution()) { // pending token TryWake(pool); } - } - + } + inline void TUnitedWorkers::Balance() { ui64 ts = GetCycleCountFast(); if (Balancer->TryLock(ts)) { diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h index b1af850312..a090ba2466 100644 --- a/library/cpp/actors/core/executor_pool_united.h +++ b/library/cpp/actors/core/executor_pool_united.h @@ -7,7 +7,7 @@ #include <library/cpp/actors/util/unordered_cache.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -79,7 +79,7 @@ namespace NActors { // Stop currently active execution and start new one if token is available // NOTE: Reuses token if it's not destroyed bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); - + // Stop active execution void StopExecution(TPoolId pool); diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index be92b9352a..d4df17f1b8 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -37,7 +37,7 @@ private: private: TAtomic Counter; - TActorId Receiver; + TActorId Receiver; std::function<void(void)> Action; diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 03ef88ea51..446b651efd 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -50,14 +50,14 @@ namespace NActors { &Ctx.WorkerStats); } - TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, const TActorId& parentId) { + TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, const TActorId& parentId) { if (poolId == Max<ui32>()) return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient); else return ActorSystem->Register(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient); } - TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) { + TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) { return Ctx.Executor->Register(actor, mailbox, hint, parentId ? parentId : CurrentRecipient); } @@ -71,7 +71,7 @@ namespace NActors { DyingActors.clear(); // here is actual destruction of actors } - void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId); } @@ -81,11 +81,11 @@ namespace NActors { Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId); } - void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { - ++CurrentActorScheduledEventsCounter; + void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(delta, ev, cookie, Ctx.WorkerId); - } - + } + template <class T> inline TString SafeTypeName(T* t) { if (t == nullptr) { @@ -102,7 +102,7 @@ namespace NActors { return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)"); } - inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient, + inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient, double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) { const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr; LWPROBE(EventSlowDelivery, @@ -116,7 +116,7 @@ namespace NActors { } inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const IActor* actor, ui32 poolId, ui32 activityType, - const TActorId& currentRecipient, double eventMs) { + const TActorId& currentRecipient, double eventMs) { // Event could have been destroyed by actor->Receive(); const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr; LWPROBE(SlowEvent, @@ -198,7 +198,7 @@ namespace NActors { if (actor) actor->AddElapsedTicks(elapsed); - CurrentRecipient = TActorId(); + CurrentRecipient = TActorId(); } else { TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); if (nonDelivered.Get()) { diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index f3f1d527d6..9d3c573f0d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -39,17 +39,17 @@ namespace NActors { : TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox) {} - TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(), - const TActorId& parentId = TActorId()); - TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId()); + TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(), + const TActorId& parentId = TActorId()); + TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId()); void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId); void DropUnregistered(); const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; } - void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); - void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); - + void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + bool Send(TAutoPtr<IEventHandle> ev) { #ifdef USE_ACTOR_CALLSTACK ev->Callstack = TCallstack::GetTlsCallstack(); @@ -81,7 +81,7 @@ namespace NActors { // Event-specific (currently executing) TVector<THolder<IActor>> DyingActors; - TActorId CurrentRecipient; + TActorId CurrentRecipient; ui64 CurrentActorScheduledEventsCounter = 0; // Thread-specific diff --git a/library/cpp/actors/core/invoke.h b/library/cpp/actors/core/invoke.h index 26de350a95..931a9767dd 100644 --- a/library/cpp/actors/core/invoke.h +++ b/library/cpp/actors/core/invoke.h @@ -92,7 +92,7 @@ namespace NActors { , Complete(std::move(complete)) {} - void Bootstrap(const TActorId& parentId, const TActorContext& ctx) { + void Bootstrap(const TActorId& parentId, const TActorContext& ctx) { auto process = [complete = std::move(Complete)](TEvents::TEvInvokeResult& res, const TActorContext& ctx) { complete([&] { return res.GetResult<TCallback>(); }, ctx); }; diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 88e24f4c01..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -1,7 +1,7 @@ #include "log.h" #include "log_settings.h" -#include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/monlib/service/pages/templates.h> static_assert(int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG), "expect int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG)"); static_assert(int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT), "expect int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT)"); diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index c7b6e85bef..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -14,7 +14,7 @@ #include <util/string/printf.h> #include <util/string/builder.h> #include <library/cpp/logger/all.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/monlib/metrics/metric_registry.h> #include <library/cpp/json/writer/json.h> #include <library/cpp/svnversion/svnversion.h> @@ -323,7 +323,7 @@ namespace NActors { { const NLog::TSettings *mSettings = ctx.LoggerSettings(); TLoggerActor::Throttle(*mSettings); - ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); + ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); } template <typename TCtx, typename... TArgs> diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp index de2a3a9a68..f52f2fc5d2 100644 --- a/library/cpp/actors/core/log_settings.cpp +++ b/library/cpp/actors/core/log_settings.cpp @@ -4,7 +4,7 @@ namespace NActors { namespace NLog { - TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, + TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EComponent minVal, EComponent maxVal, EComponentToStringFunc func, EPriority defPriority, EPriority defSamplingPriority, ui32 defSamplingRate, ui64 timeThresholdMs) @@ -27,7 +27,7 @@ namespace NActors { Append(minVal, maxVal, func); } - TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, + TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EPriority defPriority, EPriority defSamplingPriority, ui32 defSamplingRate, ui64 timeThresholdMs) : LoggerActorId(loggerActorId) diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index 564d2db73e..7fe4504edd 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -69,7 +69,7 @@ namespace NActors { struct TSettings: public TThrRefBase { public: - TActorId LoggerActorId; + TActorId LoggerActorId; EComponent LoggerComponent; ui64 TimeThresholdMs; bool AllowDrop; @@ -98,12 +98,12 @@ namespace NActors { // protobuf enumeration of components. In this case protoc // automatically generates YOURTYPE_MIN, YOURTYPE_MAX and // YOURTYPE_Name for you. - TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, + TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EComponent minVal, EComponent maxVal, EComponentToStringFunc func, EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG, ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000); - TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, + TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG, ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000); diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp index 2a65e270de..09b5f88ea2 100644 --- a/library/cpp/actors/core/log_ut.cpp +++ b/library/cpp/actors/core/log_ut.cpp @@ -15,7 +15,7 @@ namespace { } TIntrusivePtr<TSettings> DefaultSettings() { - auto loggerId = TActorId{0, "Logger"}; + auto loggerId = TActorId{0, "Logger"}; auto s = MakeIntrusive<TSettings>(loggerId, 0, EPriority::PRI_TRACE); s->SetAllowDrop(false); s->Append(0, 1, ServiceToString); @@ -98,7 +98,7 @@ namespace { TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()}; std::shared_ptr<TMockBackend> LogBackend; - TActorId LoggerActor; + TActorId LoggerActor; TTestActorRuntimeBase Runtime; }; } diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index 40fcebaa72..d84b4f9e46 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -163,7 +163,7 @@ namespace NActors { } bool TMailboxTable::SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) { - const TActorId& recipient = ev->GetRecipientRewrite(); + const TActorId& recipient = ev->GetRecipientRewrite(); const ui32 hint = recipient.Hint(); // copy-paste from Get to avoid duplicated type-switches diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 38a03af42d..0bd9c4d314 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -305,14 +305,14 @@ namespace NActors { static const ui32 LineIndexShift = 12; static const ui32 LineIndexMask = 0x1FFFFu << LineIndexShift; static const ui32 LineHintMask = 0xFFFu; - static const ui32 PoolIndexShift = TActorId::PoolIndexShift; - static const ui32 PoolIndexMask = TActorId::PoolIndexMask; + static const ui32 PoolIndexShift = TActorId::PoolIndexShift; + static const ui32 PoolIndexMask = TActorId::PoolIndexMask; static ui32 LineIndex(ui32 hint) { return ((hint & LineIndexMask) >> LineIndexShift); } static ui32 PoolIndex(ui32 hint) { - return TActorId::PoolIndex(hint); + return TActorId::PoolIndex(hint); } TMailboxHeader* Get(ui32 hint); diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h index 45e0e7ff65..c450f2338e 100644 --- a/library/cpp/actors/core/mon.h +++ b/library/cpp/actors/core/mon.h @@ -2,8 +2,8 @@ #include "events.h" #include "event_local.h" -#include <library/cpp/monlib/service/monservice.h> -#include <library/cpp/monlib/service/pages/mon_page.h> +#include <library/cpp/monlib/service/monservice.h> +#include <library/cpp/monlib/service/pages/mon_page.h> namespace NActors { namespace NMon { diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp index 61bf7452a7..0e1dbd0031 100644 --- a/library/cpp/actors/core/process_stats.cpp +++ b/library/cpp/actors/core/process_stats.cpp @@ -3,8 +3,8 @@ #include "hfunc.h" #include "process_stats.h" -#include <library/cpp/monlib/dynamic_counters/counters.h> -#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/metrics/metric_registry.h> #include <util/datetime/uptime.h> #include <util/system/defaults.h> @@ -197,7 +197,7 @@ namespace { MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true); MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true); UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false); - NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false); + NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false); SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false); } @@ -213,7 +213,7 @@ namespace { *MinorPageFaults = procStat.MinFlt; *MajorPageFaults = procStat.MajFlt; *UptimeSeconds = procStat.Uptime.Seconds(); - *NumThreads = procStat.NumThreads; + *NumThreads = procStat.NumThreads; *SystemUptimeSeconds = procStat.Uptime.Seconds(); } @@ -228,7 +228,7 @@ namespace { NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults; NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults; NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds; - NMonitoring::TDynamicCounters::TCounterPtr NumThreads; + NMonitoring::TDynamicCounters::TCounterPtr NumThreads; NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds; }; @@ -236,7 +236,7 @@ namespace { class TRegistryCollector: public TProcStatCollectingActor<TRegistryCollector> { using TBase = TProcStatCollectingActor<TRegistryCollector>; public: - TRegistryCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) + TRegistryCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) : TBase{interval} { VmSize = registry.IntGauge({{"sensor", "process.VmSize"}}); @@ -244,13 +244,13 @@ namespace { FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}}); CGroupMemLimit = registry.IntGauge({{"sensor", "process.CGroupMemLimit"}}); UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}}); - NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}}); + NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}}); SystemUptimeSeconds = registry.IntGauge({{"sensor", "system.UptimeSeconds"}}); - - UserTime = registry.Rate({{"sensor", "process.UserTime"}}); - SysTime = registry.Rate({{"sensor", "process.SystemTime"}}); - MinorPageFaults = registry.Rate({{"sensor", "process.MinorPageFaults"}}); - MajorPageFaults = registry.Rate({{"sensor", "process.MajorPageFaults"}}); + + UserTime = registry.Rate({{"sensor", "process.UserTime"}}); + SysTime = registry.Rate({{"sensor", "process.SystemTime"}}); + MinorPageFaults = registry.Rate({{"sensor", "process.MinorPageFaults"}}); + MajorPageFaults = registry.Rate({{"sensor", "process.MajorPageFaults"}}); } void UpdateCounters(const TProcStat& procStat) { @@ -259,23 +259,23 @@ namespace { FileRssSize->Set(procStat.FileRss); CGroupMemLimit->Set(procStat.CGroupMemLim); UptimeSeconds->Set(procStat.Uptime.Seconds()); - NumThreads->Set(procStat.NumThreads); + NumThreads->Set(procStat.NumThreads); SystemUptimeSeconds->Set(procStat.SystemUptime.Seconds()); - - // it is ok here to reset and add metric value, because mutation - // is performed in siglethreaded context - - UserTime->Reset(); - UserTime->Add(procStat.Utime); - - SysTime->Reset(); - SysTime->Add(procStat.Stime); - - MinorPageFaults->Reset(); - MinorPageFaults->Add(procStat.MinFlt); - - MajorPageFaults->Reset(); - MajorPageFaults->Add(procStat.MajFlt); + + // it is ok here to reset and add metric value, because mutation + // is performed in siglethreaded context + + UserTime->Reset(); + UserTime->Add(procStat.Utime); + + SysTime->Reset(); + SysTime->Add(procStat.Stime); + + MinorPageFaults->Reset(); + MinorPageFaults->Add(procStat.MinFlt); + + MajorPageFaults->Reset(); + MajorPageFaults->Add(procStat.MajFlt); } private: @@ -283,12 +283,12 @@ namespace { NMonitoring::TIntGauge* AnonRssSize; NMonitoring::TIntGauge* FileRssSize; NMonitoring::TIntGauge* CGroupMemLimit; - NMonitoring::TRate* UserTime; - NMonitoring::TRate* SysTime; - NMonitoring::TRate* MinorPageFaults; - NMonitoring::TRate* MajorPageFaults; + NMonitoring::TRate* UserTime; + NMonitoring::TRate* SysTime; + NMonitoring::TRate* MinorPageFaults; + NMonitoring::TRate* MajorPageFaults; NMonitoring::TIntGauge* UptimeSeconds; - NMonitoring::TIntGauge* NumThreads; + NMonitoring::TIntGauge* NumThreads; NMonitoring::TIntGauge* SystemUptimeSeconds; }; } // namespace @@ -297,7 +297,7 @@ namespace { return new TDynamicCounterCollector(intervalSec, counters); } - IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) { + IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) { return new TRegistryCollector(interval, registry); } } diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h index 4e6bb31090..66346d0b5a 100644 --- a/library/cpp/actors/core/process_stats.h +++ b/library/cpp/actors/core/process_stats.h @@ -3,10 +3,10 @@ #include "defs.h" #include "actor.h" -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> namespace NMonitoring { - class TMetricRegistry; + class TMetricRegistry; } namespace NActors { @@ -62,5 +62,5 @@ namespace NActors { }; IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters); - IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry); + IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry); } diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp index d1e6501fd1..febc5e40dd 100644 --- a/library/cpp/actors/core/scheduler_actor.cpp +++ b/library/cpp/actors/core/scheduler_actor.cpp @@ -39,7 +39,7 @@ namespace NActors { TVector<NSchedulerQueue::TReader*> Readers; - TActorId PollerActor; + TActorId PollerActor; TPollerToken::TPtr PollerToken; ui64 RealTime; @@ -68,7 +68,7 @@ namespace NActors { : TActor(&TSchedulerActor::StateFunc) , Cfg(cfg) , TimerDescriptor(new TTimerDescriptor()) - , PollerActor(MakePollerActorId()) + , PollerActor(MakePollerActorId()) { Y_ASSERT(Cfg.ResolutionMicroseconds != 0); Y_ASSERT(Cfg.ProgressThreshold != 0); diff --git a/library/cpp/actors/core/scheduler_actor.h b/library/cpp/actors/core/scheduler_actor.h index 600f8d98ff..c2c561b43d 100644 --- a/library/cpp/actors/core/scheduler_actor.h +++ b/library/cpp/actors/core/scheduler_actor.h @@ -21,9 +21,9 @@ namespace NActors { IActor* CreateSchedulerActor(const TSchedulerConfig& cfg); - inline TActorId MakeSchedulerActorId() { + inline TActorId MakeSchedulerActorId() { char x[12] = {'s', 'c', 'h', 'e', 'd', 'u', 'l', 'e', 'r', 's', 'e', 'r'}; - return TActorId(0, TStringBuf(x, 12)); + return TActorId(0, TStringBuf(x, 12)); } } diff --git a/library/cpp/actors/core/scheduler_actor_ut.cpp b/library/cpp/actors/core/scheduler_actor_ut.cpp index dae14cbe67..09b7369d36 100644 --- a/library/cpp/actors/core/scheduler_actor_ut.cpp +++ b/library/cpp/actors/core/scheduler_actor_ut.cpp @@ -60,14 +60,14 @@ Y_UNIT_TEST_SUITE(SchedulerActor) { setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic"); } // create poller actor (whether platform supports it) - TActorId pollerActorId; + TActorId pollerActorId; if (IActor* poller = CreatePollerActor()) { - pollerActorId = MakePollerActorId(); + pollerActorId = MakePollerActorId(); setup->LocalServices.emplace_back(pollerActorId, TActorSetupCmd(poller, TMailboxType::ReadAsFilled, 0)); } - TActorId schedulerActorId; + TActorId schedulerActorId; if (IActor* schedulerActor = CreateSchedulerActor(TSchedulerConfig())) { - schedulerActorId = MakeSchedulerActorId(); + schedulerActorId = MakeSchedulerActorId(); setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0)); } setup->Scheduler = CreateSchedulerThread(TSchedulerConfig()); diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make index 11b2ea3eb7..3ee28d5850 100644 --- a/library/cpp/actors/core/ut/ya.make +++ b/library/cpp/actors/core/ut/ya.make @@ -29,18 +29,18 @@ PEERDIR( ) SRCS( - actor_coroutine_ut.cpp - actor_ut.cpp - actorsystem_ut.cpp + actor_coroutine_ut.cpp + actor_ut.cpp + actorsystem_ut.cpp ask_ut.cpp balancer_ut.cpp - event_pb_payload_ut.cpp + event_pb_payload_ut.cpp event_pb_ut.cpp executor_pool_basic_ut.cpp executor_pool_united_ut.cpp log_ut.cpp memory_tracker_ut.cpp - scheduler_actor_ut.cpp + scheduler_actor_ut.cpp ) END() diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 40f27456c8..880a9d00db 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -111,7 +111,7 @@ PEERDIR( library/cpp/json/writer library/cpp/logger library/cpp/lwtrace - library/cpp/monlib/dynamic_counters + library/cpp/monlib/dynamic_counters library/cpp/svnversion library/cpp/threading/future ) diff --git a/library/cpp/actors/helpers/activeactors.h b/library/cpp/actors/helpers/activeactors.h index b0e4f5cc99..0fdb0fab10 100644 --- a/library/cpp/actors/helpers/activeactors.h +++ b/library/cpp/actors/helpers/activeactors.h @@ -10,9 +10,9 @@ namespace NActors { // TActiveActors // This class helps manage created actors and kill them all on PoisonPill. //////////////////////////////////////////////////////////////////////////// - class TActiveActors : public THashSet<TActorId> { + class TActiveActors : public THashSet<TActorId> { public: - void Insert(const TActorId &aid) { + void Insert(const TActorId &aid) { bool inserted = insert(aid).second; Y_VERIFY(inserted); } @@ -23,7 +23,7 @@ namespace NActors { } } - void Erase(const TActorId &aid) { + void Erase(const TActorId &aid) { auto num = erase(aid); Y_VERIFY(num == 1); } diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp index 61610ec3d3..d75cc54023 100644 --- a/library/cpp/actors/helpers/flow_controlled_queue.cpp +++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp @@ -18,12 +18,12 @@ class TFlowControlledRequestActor : public IActor { void HandleReply(TAutoPtr<IEventHandle> &ev); void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev); public: - const TActorId Source; + const TActorId Source; const ui64 Cookie; const ui32 Flags; const ui64 StartCounter; - TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags) + TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags) : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity) , QueueActor(queue) , Source(source) @@ -49,7 +49,7 @@ public: }; class TFlowControlledRequestQueue : public IActor { - const TActorId Target; + const TActorId Target; const TFlowControlledQueueConfig Config; TDeque<THolder<IEventHandle>> UnhandledRequests; @@ -123,7 +123,7 @@ class TFlowControlledRequestQueue : public IActor { if (reqActor) { if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) { TActivationContext::Send( - new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) + new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) ); } reqActor->PassAway(); @@ -153,7 +153,7 @@ class TFlowControlledRequestQueue : public IActor { PassAway(); } public: - TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config) + TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config) : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity) , Target(target) , Config(config) @@ -208,7 +208,7 @@ void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPt } -IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) { +IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) { return new TFlowControlledRequestQueue(targetId, activity, config); } diff --git a/library/cpp/actors/helpers/flow_controlled_queue.h b/library/cpp/actors/helpers/flow_controlled_queue.h index 1d03226103..d250405304 100644 --- a/library/cpp/actors/helpers/flow_controlled_queue.h +++ b/library/cpp/actors/helpers/flow_controlled_queue.h @@ -13,6 +13,6 @@ namespace NActors { ui32 LatencyFactor = 4; }; - IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity = IActor::ACTORLIB_COMMON, const TFlowControlledQueueConfig &config = TFlowControlledQueueConfig()); + IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity = IActor::ACTORLIB_COMMON, const TFlowControlledQueueConfig &config = TFlowControlledQueueConfig()); } diff --git a/library/cpp/actors/helpers/mon_histogram_helper.h b/library/cpp/actors/helpers/mon_histogram_helper.h index 80b9690a75..a9a57e3823 100644 --- a/library/cpp/actors/helpers/mon_histogram_helper.h +++ b/library/cpp/actors/helpers/mon_histogram_helper.h @@ -1,9 +1,9 @@ #pragma once -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/string/cast.h> -#include <util/string/cast.h> - namespace NActors { namespace NMon { class THistogramCounterHelper { @@ -79,7 +79,7 @@ namespace NActors { ui64 FirstBucketVal; ui64 BucketCount; TVector<NMonitoring::TDynamicCounters::TCounterPtr> BucketsHolder; - TVector<NMonitoring::TDeprecatedCounter*> Buckets; + TVector<NMonitoring::TDeprecatedCounter*> Buckets; }; } diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h index d1f320509e..d7d07f9fa8 100644 --- a/library/cpp/actors/helpers/selfping_actor.h +++ b/library/cpp/actors/helpers/selfping_actor.h @@ -1,7 +1,7 @@ #pragma once #include <library/cpp/actors/core/actor.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> namespace NActors { diff --git a/library/cpp/actors/helpers/ya.make b/library/cpp/actors/helpers/ya.make index 0169a2c727..d8771179de 100644 --- a/library/cpp/actors/helpers/ya.make +++ b/library/cpp/actors/helpers/ya.make @@ -14,7 +14,7 @@ SRCS( PEERDIR( library/cpp/actors/core - library/cpp/monlib/dynamic_counters + library/cpp/monlib/dynamic_counters ) END() diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp index 834fe47b73..27c4eeb6f3 100644 --- a/library/cpp/actors/http/http_cache.cpp +++ b/library/cpp/actors/http/http_cache.cpp @@ -16,7 +16,7 @@ namespace NHttp { class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig { public: using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>; - NActors::TActorId HttpProxyId; + NActors::TActorId HttpProxyId; TGetCachePolicy GetCachePolicy; static constexpr TDuration RefreshTimeout = TDuration::Seconds(1); @@ -584,7 +584,7 @@ TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePoli return policy; } -NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { +NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy)); } diff --git a/library/cpp/actors/http/http_cache.h b/library/cpp/actors/http/http_cache.h index 313c7bd266..ac38bdcac8 100644 --- a/library/cpp/actors/http/http_cache.h +++ b/library/cpp/actors/http/http_cache.h @@ -19,7 +19,7 @@ struct TCachePolicy { using TGetCachePolicy = std::function<TCachePolicy(const THttpRequest*)>; -NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); +NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& policy = TCachePolicy()); diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index 3a466006cd..36c6855d93 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -1,5 +1,5 @@ #include <library/cpp/actors/core/events.h> -#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/metrics/metric_registry.h> #include "http_proxy.h" namespace NHttp { @@ -8,7 +8,7 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC public: IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller); - TActorId acceptorId = ctx.Register(listeningSocket); + TActorId acceptorId = ctx.Register(listeningSocket); ctx.Send(event->Forward(acceptorId)); Acceptors.emplace_back(acceptorId); return listeningSocket; @@ -16,7 +16,7 @@ public: IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) { IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller); - TActorId connectionId = ctx.Register(connectionSocket); + TActorId connectionId = ctx.Register(connectionSocket); Connections.emplace(connectionId); return connectionSocket; } @@ -26,7 +26,7 @@ public: Become(&THttpProxy::StateWork); } - THttpProxy(NMonitoring::TMetricRegistry& sensors) + THttpProxy(NMonitoring::TMetricRegistry& sensors) : Sensors(sensors) {} @@ -49,10 +49,10 @@ protected: void PassAway() override { Send(Poller, new NActors::TEvents::TEvPoisonPill()); - for (const NActors::TActorId& connection : Connections) { + for (const NActors::TActorId& connection : Connections) { Send(connection, new NActors::TEvents::TEvPoisonPill()); } - for (const NActors::TActorId& acceptor : Acceptors) { + for (const NActors::TActorId& acceptor : Acceptors) { Send(acceptor, new NActors::TEvents::TEvPoisonPill()); } NActors::TActorBootstrapped<THttpProxy>::PassAway(); @@ -60,7 +60,7 @@ protected: void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { TStringBuf url = event->Get()->Request->URL.Before('?'); - THashMap<TString, TActorId>::iterator it; + THashMap<TString, TActorId>::iterator it; while (!url.empty()) { it = Handlers.find(url); if (it != Handlers.end()) { @@ -204,8 +204,8 @@ protected: PassAway(); } - NActors::TActorId Poller; - TVector<TActorId> Acceptors; + NActors::TActorId Poller; + TVector<TActorId> Acceptors; struct THostEntry { TSockAddrInet6 Address; @@ -215,9 +215,9 @@ protected: static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60); THashMap<TString, THostEntry> Hosts; - THashMap<TString, TActorId> Handlers; - THashSet<TActorId> Connections; // outgoing - NMonitoring::TMetricRegistry& Sensors; + THashMap<TString, TActorId> Handlers; + THashSet<TActorId> Connections; // outgoing + NMonitoring::TMetricRegistry& Sensors; }; TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) { @@ -240,7 +240,7 @@ TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingR ); } -NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) { +NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) { return new THttpProxy(sensors); } diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h index 97ea6fbd44..afd0170997 100644 --- a/library/cpp/actors/http/http_proxy.h +++ b/library/cpp/actors/http/http_proxy.h @@ -8,7 +8,7 @@ #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/interconnect/poller_actor.h> #include <library/cpp/dns/cache.h> -#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/metrics/metric_registry.h> #include <util/generic/variant.h> #include "http.h" #include "http_proxy_ssl.h" @@ -25,7 +25,7 @@ struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig { struct TEvHttpProxy { enum EEv { - EvAddListeningPort = EventSpaceBegin(NActors::TEvents::ES_HTTP), + EvAddListeningPort = EventSpaceBegin(NActors::TEvents::ES_HTTP), EvConfirmListen, EvRegisterHandler, EvHttpIncomingRequest, @@ -41,7 +41,7 @@ struct TEvHttpProxy { EvEnd }; - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small."); + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small."); struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> { TIpPort Port; @@ -71,9 +71,9 @@ struct TEvHttpProxy { struct TEvRegisterHandler : NActors::TEventLocal<TEvRegisterHandler, EvRegisterHandler> { TString Path; - TActorId Handler; + TActorId Handler; - TEvRegisterHandler(const TString& path, const TActorId& handler) + TEvRegisterHandler(const TString& path, const TActorId& handler) : Path(path) , Handler(handler) {} @@ -142,32 +142,32 @@ struct TEvHttpProxy { struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> { TString PeerAddress; - TActorId ConnectionID; + TActorId ConnectionID; - TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID) + TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID) : PeerAddress(peerAddress) , ConnectionID(connectionID) {} }; struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> { - TActorId ConnectionID; + TActorId ConnectionID; TDeque<THttpIncomingRequestPtr> RecycledRequests; - TEvHttpConnectionClosed(const TActorId& connectionID) + TEvHttpConnectionClosed(const TActorId& connectionID) : ConnectionID(connectionID) {} - TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests) + TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests) : ConnectionID(connectionID) , RecycledRequests(std::move(recycledRequests)) {} }; struct TEvHttpAcceptorClosed : NActors::TEventLocal<TEvHttpAcceptorClosed, EvHttpAcceptorClosed> { - TActorId ConnectionID; + TActorId ConnectionID; - TEvHttpAcceptorClosed(const TActorId& connectionID) + TEvHttpAcceptorClosed(const TActorId& connectionID) : ConnectionID(connectionID) {} }; @@ -218,16 +218,16 @@ struct TEvHttpProxy { }; struct TEndpointInfo { - TActorId Proxy; - TActorId Owner; + TActorId Proxy; + TActorId Owner; TString WorkerName; bool Secure; TSslHelpers::TSslHolder<SSL_CTX> SecureContext; }; -NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors); -NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller); -NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller); +NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors); +NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller); +NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller); NActors::IActor* CreateIncomingConnectionActor( const TEndpointInfo& endpoint, TIntrusivePtr<TSocketDescriptor> socket, diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp index 95b07ffa84..9780541b71 100644 --- a/library/cpp/actors/http/http_proxy_acceptor.cpp +++ b/library/cpp/actors/http/http_proxy_acceptor.cpp @@ -7,15 +7,15 @@ namespace NHttp { class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig { public: using TBase = NActors::TActor<TAcceptorActor>; - const TActorId Owner; - const TActorId Poller; + const TActorId Owner; + const TActorId Poller; TIntrusivePtr<TSocketDescriptor> Socket; NActors::TPollerToken::TPtr PollerToken; - THashSet<TActorId> Connections; + THashSet<TActorId> Connections; TDeque<THttpIncomingRequestPtr> RecycledRequests; TEndpointInfo Endpoint; - TAcceptorActor(const TActorId& owner, const TActorId& poller) + TAcceptorActor(const TActorId& owner, const TActorId& poller) : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit) , Owner(owner) , Poller(poller) @@ -77,12 +77,12 @@ protected: } } LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << bindAddress.ToString() << " - retrying..."); - ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release()); + ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release()); } void Die(const NActors::TActorContext& ctx) override { ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID)); - for (const NActors::TActorId& connection : Connections) { + for (const NActors::TActorId& connection : Connections) { ctx.Send(connection, new NActors::TEvents::TEvPoisonPill()); } } @@ -104,7 +104,7 @@ protected: connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front())); RecycledRequests.pop_front(); } - NActors::TActorId connectionId = ctx.Register(connectionSocket); + NActors::TActorId connectionId = ctx.Register(connectionSocket); ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId)); Connections.emplace(connectionId); socket = new TSocketDescriptor(); @@ -128,7 +128,7 @@ protected: } }; -NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) { +NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) { return new TAcceptorActor(owner, poller); } diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp index 5bd4dd74b0..d9189dba8a 100644 --- a/library/cpp/actors/http/http_proxy_outgoing.cpp +++ b/library/cpp/actors/http/http_proxy_outgoing.cpp @@ -8,18 +8,18 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor public: using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>; using TSelf = TOutgoingConnectionActor<TSocketImpl>; - const TActorId Owner; - const TActorId Poller; + const TActorId Owner; + const TActorId Poller; SocketAddressType Address; TString Host; - TActorId RequestOwner; + TActorId RequestOwner; THttpOutgoingRequestPtr Request; THttpIncomingResponsePtr Response; TInstant LastActivity; TDuration ConnectionTimeout = CONNECTION_TIMEOUT; NActors::TPollerToken::TPtr PollerToken; - TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller) + TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller) : TBase(&TSelf::StateWaiting) , Owner(owner) , Poller(poller) @@ -38,7 +38,7 @@ public: void ReplyAndDie(const NActors::TActorContext& ctx) { LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")"); ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response)); - RequestOwner = TActorId(); + RequestOwner = TActorId(); THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response)); ctx.Send(Owner, sensors.Release()); LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); @@ -49,7 +49,7 @@ public: LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error); if (RequestOwner) { ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error)); - RequestOwner = TActorId(); + RequestOwner = TActorId(); THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response)); ctx.Send(Owner, sensors.Release()); Die(ctx); @@ -287,7 +287,7 @@ protected: } }; -NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) { +NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) { if (secure) { return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller); } else { diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index b21ceb550f..4c922f8d0f 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -180,17 +180,17 @@ Y_UNIT_TEST_SUITE(HttpProxy) { TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); - NMonitoring::TMetricRegistry sensors; + NMonitoring::TMetricRegistry sensors; NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); - NActors::TActorId proxyId = actorSystem.Register(proxy); - actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); + NActors::TActorId proxyId = actorSystem.Register(proxy); + actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); actorSystem.DispatchEvents(); - NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); + NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); - NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); + NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test"); actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); @@ -213,7 +213,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) { TIpPort port = portManager.GetTcpPort(); TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); - NMonitoring::TMetricRegistry sensors; + NMonitoring::TMetricRegistry sensors; TString certificateContent = R"___(-----BEGIN PRIVATE KEY----- MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w @@ -273,7 +273,7 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V certificateFile.Write(certificateContent.data(), certificateContent.size()); NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); - NActors::TActorId proxyId = actorSystem.Register(proxy); + NActors::TActorId proxyId = actorSystem.Register(proxy); THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> add = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(port); ///////// https configuration @@ -281,13 +281,13 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V add->CertificateFile = certificateFile.Name(); add->PrivateKeyFile = certificateFile.Name(); ///////// - actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true); actorSystem.DispatchEvents(); - NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); + NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); - NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); + NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test"); actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); @@ -314,11 +314,11 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V NActors::TActorSystem actorSystem(setup); actorSystem.Start(); NHttp::THttpProxy* incomingProxy = new NHttp::THttpProxy(); - NActors::TActorId incomingProxyId = actorSystem.Register(incomingProxy); + NActors::TActorId incomingProxyId = actorSystem.Register(incomingProxy); actorSystem.Send(incomingProxyId, new NHttp::TEvHttpProxy::TEvAddListeningPort(13337)); NHttp::THttpProxy* outgoingProxy = new NHttp::THttpProxy(); - NActors::TActorId outgoingProxyId = actorSystem.Register(outgoingProxy); + NActors::TActorId outgoingProxyId = actorSystem.Register(outgoingProxy); THolder<NHttp::THttpStaticStringRequest> httpRequest = MakeHolder<NHttp::THttpStaticStringRequest>("GET /test HTTP/1.1\r\n\r\n"); actorSystem.Send(outgoingProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest("[::]:13337", std::move(httpRequest))); diff --git a/library/cpp/actors/http/ya.make b/library/cpp/actors/http/ya.make index ade447be3f..7ce68b7a75 100644 --- a/library/cpp/actors/http/ya.make +++ b/library/cpp/actors/http/ya.make @@ -26,7 +26,7 @@ PEERDIR( library/cpp/actors/core library/cpp/actors/interconnect library/cpp/dns - library/cpp/monlib/metrics + library/cpp/monlib/metrics library/cpp/string_utils/quote ) diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index a7da62c3d7..8a46ffd535 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -107,29 +107,29 @@ namespace NActors { struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") - TEvHandshakeAsk(const TActorId& self, - const TActorId& peer, + TEvHandshakeAsk(const TActorId& self, + const TActorId& peer, ui64 counter) : Self(self) , Peer(peer) , Counter(counter) { } - const TActorId Self; - const TActorId Peer; + const TActorId Self; + const TActorId Peer; const ui64 Counter; }; struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck") - TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params) + TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params) : Self(self) , NextPacket(nextPacket) , Params(std::move(params)) {} - const TActorId Self; + const TActorId Self; const ui64 NextPacket; const TSessionParams Params; }; @@ -185,8 +185,8 @@ namespace NActors { TEvHandshakeDone( TIntrusivePtr<NInterconnect::TStreamSocket> socket, - const TActorId& peer, - const TActorId& self, + const TActorId& peer, + const TActorId& self, ui64 nextPacket, TAutoPtr<TProgramInfo>&& programInfo, TSessionParams params) @@ -200,8 +200,8 @@ namespace NActors { } TIntrusivePtr<NInterconnect::TStreamSocket> Socket; - const TActorId Peer; - const TActorId Self; + const TActorId Peer; + const TActorId Self; const ui64 NextPacket; TAutoPtr<TProgramInfo> ProgramInfo; const TSessionParams Params; @@ -319,10 +319,10 @@ namespace NActors { template <typename TContainer> TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) { - for (const TActorId& actorId : route) { + for (const TActorId& actorId : route) { auto* hop = Record.AddHops(); if (actorId) { - ActorIdToProto(actorId, hop->MutableNextHop()); + ActorIdToProto(actorId, hop->MutableNextHop()); } } Record.SetId(id); @@ -366,13 +366,13 @@ namespace NActors { }; struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> { - TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize) + TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize) : SessionID(sessionId) , BufferSize(outputBufferSize) { } - TActorId SessionID; + TActorId SessionID; ui64 BufferSize; }; diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h index f052a6e92e..225a5243fd 100644 --- a/library/cpp/actors/interconnect/interconnect.h +++ b/library/cpp/actors/interconnect/interconnect.h @@ -10,7 +10,7 @@ namespace NActors { TString SelfAddress; ui32 SelfPort; - TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time) + TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time) }; struct TInterconnectProxySetup: public TThrRefBase { @@ -41,12 +41,12 @@ namespace NActors { TIntrusivePtr<TInterconnectGlobalState> GlobalState; - virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls + virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls virtual TActorSetupCmd CreateAcceptor() = 0; }; struct TNameserverSetup { - TActorId ServiceID; + TActorId ServiceID; TIntrusivePtr<TInterconnectGlobalState> GlobalState; }; @@ -118,12 +118,12 @@ namespace NActors { }; struct TNodeRegistrarSetup { - TActorId ServiceID; + TActorId ServiceID; TIntrusivePtr<TInterconnectGlobalState> GlobalState; }; - TActorId GetNameserviceActorId(); + TActorId GetNameserviceActorId(); /** * Const table-lookup based name service diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 659a6a9e5c..e4a0ae3cda 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -1,6 +1,6 @@ #pragma once -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/event_load.h> #include <library/cpp/actors/util/rope.h> diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 81e0694da1..285709a00c 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -3,7 +3,7 @@ #include <library/cpp/actors/core/actorid.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/util/datetime.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/monlib/metrics/metric_registry.h> #include <util/generic/map.h> #include <util/generic/set.h> @@ -63,7 +63,7 @@ namespace NActors { typedef TMap<ui16, TChannelSettings> TChannelsConfig; using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title, - TActorSystem* actorSystem, const TActorId& actorId)>; + TActorSystem* actorSystem, const TActorId& actorId)>; using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>; @@ -71,13 +71,13 @@ namespace NActors { bool orange, bool red, TActorSystem* actorSystem)>; struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> { - TActorId NameserviceId; + TActorId NameserviceId; NMonitoring::TDynamicCounterPtr MonCounters; std::shared_ptr<NMonitoring::IMetricRegistry> Metrics; TChannelsConfig ChannelsConfig; TInterconnectSettings Settings; TRegisterMonPageCallback RegisterMonPage; - TActorId DestructorId; + TActorId DestructorId; std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize; TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024; TString ClusterUUID; diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp index e389e93688..224160d4b4 100644 --- a/library/cpp/actors/interconnect/interconnect_counters.cpp +++ b/library/cpp/actors/interconnect/interconnect_counters.cpp @@ -619,11 +619,11 @@ namespace { TotalBytesRead_ = createRate(Metrics_, "interconnect.total_bytes_read"); for (const char *reason : TDisconnectReason::Reasons) { - DisconnectByReason_[reason] = Metrics_->Rate( - NMonitoring::MakeLabels({ - {"sensor", "interconnect.disconnect_reason"}, - {"reason", reason}, - })); + DisconnectByReason_[reason] = Metrics_->Rate( + NMonitoring::MakeLabels({ + {"sensor", "interconnect.disconnect_reason"}, + {"reason", reason}, + })); } } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 51d1e607bc..9ede998d8e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -25,8 +25,8 @@ namespace NActors { struct TInitialPacket { struct { - TActorId SelfVirtualId; - TActorId PeerVirtualId; + TActorId SelfVirtualId; + TActorId PeerVirtualId; ui64 NextPacket; ui64 Version; } Header; @@ -34,7 +34,7 @@ namespace NActors { TInitialPacket() = default; - TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) { + TInitialPacket(const TActorId& self, const TActorId& peer, ui64 nextPacket, ui64 version) { Header.SelfVirtualId = self; Header.PeerVirtualId = peer; Header.NextPacket = nextPacket; @@ -79,8 +79,8 @@ namespace NActors { private: TInterconnectProxyCommon::TPtr Common; - TActorId SelfVirtualId; - TActorId PeerVirtualId; + TActorId SelfVirtualId; + TActorId PeerVirtualId; ui32 PeerNodeId = 0; ui64 NextPacketToPeer = 0; TMaybe<ui64> NextPacketFromPeer; // will be obtained from incoming initial packet @@ -102,7 +102,7 @@ namespace NActors { return IActor::INTERCONNECT_HANDSHAKE; } - THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, + THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors , Common(std::move(common)) @@ -377,7 +377,7 @@ namespace NActors { // set up virtual self id to ensure peer will not drop our connection char buf[12] = {'c', 'o', 'o', 'k', 'i', 'e', ' ', 'c', 'h', 'e', 'c', 'k'}; - SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12)); + SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12)); bool success = true; try { @@ -401,7 +401,7 @@ namespace NActors { request.SetProgramStartTime(0); request.SetSerial(0); request.SetReceiverNodeId(0); - request.SetSenderActorId(TString()); + request.SetSenderActorId(TString()); request.SetCookie(cookie); request.SetDoCheckCookie(true); SendExBlock(request, "SendExBlockDoCheckCookie"); @@ -419,7 +419,7 @@ namespace NActors { } // restore state - SelfVirtualId = TActorId(); + SelfVirtualId = TActorId(); std::swap(tempSocket, Socket); std::swap(tempPollerToken, PollerToken); return success; @@ -455,7 +455,7 @@ namespace NActors { request.SetProgramStartTime(Common->StartTime); request.SetSerial(SelfVirtualId.LocalId()); request.SetReceiverNodeId(PeerNodeId); - request.SetSenderActorId(SelfVirtualId.ToString()); + request.SetSenderActorId(SelfVirtualId.ToString()); request.SetSenderHostName(Common->TechnicalSelfHostName); request.SetReceiverHostName(PeerHostName); @@ -519,7 +519,7 @@ namespace NActors { ValidateClusterUUID(success, generateError); ValidateVersionTag(success, generateError); - const auto& s = success.GetSenderActorId(); + const auto& s = success.GetSenderActorId(); PeerVirtualId.Parse(s.data(), s.size()); // recover flags @@ -599,8 +599,8 @@ namespace NActors { SendInitialPacket(); } else { // peer wants a new session, clear fields and send initial packet - SelfVirtualId = TActorId(); - PeerVirtualId = TActorId(); + SelfVirtualId = TActorId(); + PeerVirtualId = TActorId(); NextPacketToPeer = 0; SendInitialPacket(); @@ -637,7 +637,7 @@ namespace NActors { PeerHostName = request.GetSenderHostName(); // parse peer virtual id - const auto& str = request.GetSenderActorId(); + const auto& str = request.GetSenderActorId(); PeerVirtualId.Parse(str.data(), str.size()); // validate request @@ -709,7 +709,7 @@ namespace NActors { SendExBlock(record, "ExReply"); // extract sender actor id (self virtual id) - const auto& str = success.GetSenderActorId(); + const auto& str = success.GetSenderActorId(); SelfVirtualId.Parse(str.data(), str.size()); } else if (auto ev = reply->CastAsLocal<TEvHandshakeReplyError>()) { // in case of error just send reply to the peer and terminate handshake @@ -981,8 +981,8 @@ namespace NActors { } }; - IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, - const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, + IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, + const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) { return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket, std::move(peerHostName), std::move(params))); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index 7c5c25c3b8..b3c0db6c5d 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -15,8 +15,8 @@ namespace NActors { using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; - IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, - const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, + IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, + const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params); IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket); diff --git a/library/cpp/actors/interconnect/interconnect_impl.h b/library/cpp/actors/interconnect/interconnect_impl.h index 2ca0db8763..ee29e4d397 100644 --- a/library/cpp/actors/interconnect/interconnect_impl.h +++ b/library/cpp/actors/interconnect/interconnect_impl.h @@ -4,7 +4,7 @@ #include <library/cpp/actors/protos/interconnect.pb.h> #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/helpers/mon_histogram_helper.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> namespace NActors { // resolve node info diff --git a/library/cpp/actors/interconnect/interconnect_mon.cpp b/library/cpp/actors/interconnect/interconnect_mon.cpp index 48823c5b0e..cf924ccbf9 100644 --- a/library/cpp/actors/interconnect/interconnect_mon.cpp +++ b/library/cpp/actors/interconnect/interconnect_mon.cpp @@ -1,9 +1,9 @@ #include "interconnect_mon.h" #include "interconnect_tcp_proxy.h" - -#include <library/cpp/json/json_value.h> -#include <library/cpp/json/json_writer.h> -#include <library/cpp/monlib/service/pages/templates.h> + +#include <library/cpp/json/json_value.h> +#include <library/cpp/json/json_writer.h> +#include <library/cpp/monlib/service/pages/templates.h> #include <openssl/ssl.h> #include <openssl/pem.h> @@ -14,7 +14,7 @@ namespace NInterconnect { class TInterconnectMonActor : public TActor<TInterconnectMonActor> { class TQueryProcessor : public TActorBootstrapped<TQueryProcessor> { - const TActorId Sender; + const TActorId Sender; const bool Json; TMap<ui32, TInterconnectProxyTCP::TProxyStats> Stats; ui32 PendingReplies = 0; @@ -24,7 +24,7 @@ namespace NInterconnect { return INTERCONNECT_MONACTOR; } - TQueryProcessor(const TActorId& sender, bool json) + TQueryProcessor(const TActorId& sender, bool json) : Sender(sender) , Json(json) {} diff --git a/library/cpp/actors/interconnect/interconnect_mon.h b/library/cpp/actors/interconnect/interconnect_mon.h index e78229a2c4..3fb26053fb 100644 --- a/library/cpp/actors/interconnect/interconnect_mon.h +++ b/library/cpp/actors/interconnect/interconnect_mon.h @@ -7,9 +7,9 @@ namespace NInterconnect { NActors::IActor *CreateInterconnectMonActor(TIntrusivePtr<NActors::TInterconnectProxyCommon> common = nullptr); - static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) { + static inline NActors::TActorId MakeInterconnectMonActorId(ui32 nodeId) { char s[12] = {'I', 'C', 'O', 'v', 'e', 'r', 'v', 'i', 'e', 'w', 0, 0}; - return NActors::TActorId(nodeId, TStringBuf(s, 12)); + return NActors::TActorId(nodeId, TStringBuf(s, 12)); } } // NInterconnect diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp index c9f6f8b5dc..43419bf70d 100644 --- a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp +++ b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp @@ -79,8 +79,8 @@ namespace NActors { return true; } - TActorId GetNameserviceActorId() { - return TActorId(0, "namesvc"); + TActorId GetNameserviceActorId() { + return TActorId(0, "namesvc"); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index b42ae8dffd..0abe9fe659 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -6,7 +6,7 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, + TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed, TDuration deadPeerTimeout, TSessionParams params) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 4191951abd..7e2d8ccb94 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -3,7 +3,7 @@ #include "interconnect_tcp_session.h" #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/monlib/service/pages/templates.h> #include <util/system/getpid.h> namespace NActors { @@ -45,7 +45,7 @@ namespace NActors { LOG_INFO_IC("ICP01", "ready to work"); } - void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) { + void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) { if (!DynamicPtr) { // perform usual bootstrap for static nodes sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); @@ -311,9 +311,9 @@ namespace NActors { auto event = MakeHolder<TEvHandshakeReplyOK>(); auto* pb = event->Record.MutableSuccess(); - const TActorId virtualId = GenerateSessionVirtualId(); + const TActorId virtualId = GenerateSessionVirtualId(); pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION); - pb->SetSenderActorId(virtualId.ToString()); + pb->SetSenderActorId(virtualId.ToString()); pb->SetProgramPID(GetPID()); pb->SetProgramStartTime(Common->StartTime); pb->SetSerial(virtualId.LocalId()); @@ -536,14 +536,14 @@ namespace NActors { SessionVirtualId.ToString().data()); Session = nullptr; - SessionID = TActorId(); + SessionID = TActorId(); // drop all pending events as we are closed ProcessPendingSessionEvents(); // reset virtual ids as this session is terminated - SessionVirtualId = TActorId(); - RemoteSessionVirtualId = TActorId(); + SessionVirtualId = TActorId(); + RemoteSessionVirtualId = TActorId(); if (Metrics) { Metrics->IncSessionDeaths(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index e5921134ed..023e5bd1ee 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -4,7 +4,7 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/core/events.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include "interconnect_common.h" #include "interconnect_counters.h" @@ -70,7 +70,7 @@ namespace NActors { } void Bootstrap(); - void Registered(TActorSystem* sys, const TActorId& owner) override; + void Registered(TActorSystem* sys, const TActorId& owner) override; private: friend class TInterconnectSessionTCP; @@ -366,7 +366,7 @@ namespace NActors { // read only TInterconnectProxyCommon::TPtr const Common; - const TActorId& GetNameserviceId() const { + const TActorId& GetNameserviceId() const { return Common->NameserviceId; } @@ -403,24 +403,24 @@ namespace NActors { void DropSessionEvent(STATEFN_SIG); TInterconnectSessionTCP* Session = nullptr; - TActorId SessionID; + TActorId SessionID; // virtual ids used during handshake to check if it is the connection // for the same session or to find out the latest shandshake // it's virtual because session actor apears after successfull handshake - TActorId SessionVirtualId; - TActorId RemoteSessionVirtualId; + TActorId SessionVirtualId; + TActorId RemoteSessionVirtualId; - TActorId GenerateSessionVirtualId() { + TActorId GenerateSessionVirtualId() { ICPROXY_PROFILED; const ui64 localId = TlsActivationContext->ExecutorThread.ActorSystem->AllocateIDSpace(1); - return NActors::TActorId(SelfId().NodeId(), 0, localId, 0); + return NActors::TActorId(SelfId().NodeId(), 0, localId, 0); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TActorId IncomingHandshakeActor; + TActorId IncomingHandshakeActor; TInstant IncomingHandshakeActorFilledIn; TInstant IncomingHandshakeActorReset; TMaybe<ui64> LastSerialFromIncomingHandshake; @@ -429,7 +429,7 @@ namespace NActors { void DropIncomingHandshake(bool poison = true) { ICPROXY_PROFILED; - if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) { + if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) { LOG_DEBUG_IC("ICP111", "dropped incoming handshake: %s poison: %s", actorId.ToString().data(), poison ? "true" : "false"); if (poison) { @@ -444,7 +444,7 @@ namespace NActors { void DropOutgoingHandshake(bool poison = true) { ICPROXY_PROFILED; - if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) { + if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) { LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(), poison ? "true" : "false"); if (poison) { @@ -477,12 +477,12 @@ namespace NActors { SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection); } - void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId, + void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId, THolder<IEventBase> event); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TActorId OutgoingHandshakeActor; + TActorId OutgoingHandshakeActor; TInstant OutgoingHandshakeActorCreated; TInstant OutgoingHandshakeActorReset; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index 2c025dc389..b95c994598 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -23,7 +23,7 @@ namespace NActors { } } - TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { + TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h index 086fe26ab3..fc71073c2d 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h @@ -34,7 +34,7 @@ namespace NActors { } } - TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override; + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override; void Die(const TActorContext& ctx) override; @@ -50,8 +50,8 @@ namespace NActors { TInterconnectProxyCommon::TPtr const ProxyCommonCtx; }; - static inline TActorId MakeInterconnectListenerActorId(bool dynamic) { + static inline TActorId MakeInterconnectListenerActorId(bool dynamic) { char x[12] = {'I', 'C', 'L', 'i', 's', 't', 'e', 'n', 'e', 'r', '/', dynamic ? 'D' : 'S'}; - return TActorId(0, TStringBuf(x, 12)); + return TActorId(0, TStringBuf(x, 12)); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 468e8bdd64..2ded7f9f53 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -7,7 +7,7 @@ #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/monlib/service/pages/templates.h> namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); @@ -474,7 +474,7 @@ namespace NActors { if (ev->Sender == ReceiverId) { const bool wasConnected(Socket); LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data()); - ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet + ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet if (wasConnected) { // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should // restart handshake process, closing our part of socket first diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index dfab4065c0..7fc00dbcc5 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -10,7 +10,7 @@ #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/util/funnel_queue.h> #include <library/cpp/actors/util/recentwnd.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <util/generic/queue.h> @@ -179,7 +179,7 @@ namespace NActors { return INTERCONNECT_SESSION_TCP; } - TInputSessionTCP(const TActorId& sessionId, + TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, @@ -495,7 +495,7 @@ namespace NActors { void GenerateHttpInfo(TStringStream& str); TIntrusivePtr<TReceiveContext> ReceiveContext; - TActorId ReceiverId; + TActorId ReceiverId; TDuration Ping; ui64 ConfirmPacketsForcedBySize = 0; @@ -513,7 +513,7 @@ namespace NActors { : public TActorBootstrapped<TInterconnectSessionKiller> { ui32 RepliesReceived = 0; ui32 RepliesNumber = 0; - TActorId LargestSession = TActorId(); + TActorId LargestSession = TActorId(); ui64 MaxBufferSize = 0; TInterconnectProxyCommon::TPtr Common; @@ -529,7 +529,7 @@ namespace NActors { void Bootstrap() { auto sender = SelfId(); - const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { + const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { auto ev = new TEvSessionBufferSizeRequest(); return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); }; diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index 22850b3126..2a8443da71 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -72,7 +72,7 @@ namespace NInterconnect { }; class TLoadResponderMasterActor : public TActorBootstrapped<TLoadResponderMasterActor> { - TVector<TActorId> Slaves; + TVector<TActorId> Slaves; ui32 SlaveIndex = 0; STRICT_STFUNC(StateFunc, @@ -93,7 +93,7 @@ namespace NInterconnect { } void Die(const TActorContext& ctx) override { - for (const TActorId& actorId : Slaves) { + for (const TActorId& actorId : Slaves) { ctx.Send(actorId, new TEvents::TEvPoisonPill); } TActorBootstrapped::Die(ctx); @@ -122,9 +122,9 @@ namespace NInterconnect { return new TLoadResponderMasterActor(); } - TActorId MakeLoadResponderActorId(ui32 nodeId) { + TActorId MakeLoadResponderActorId(ui32 nodeId) { char x[12] = {'I', 'C', 'L', 'o', 'a', 'd', 'R', 'e', 's', 'p', 'A', 'c'}; - return TActorId(nodeId, TStringBuf(x, 12)); + return TActorId(nodeId, TStringBuf(x, 12)); } class TLoadActor: public TActorBootstrapped<TLoadActor> { @@ -144,8 +144,8 @@ namespace NInterconnect { TInstant NextMessageTimestamp; THashMap<TString, TMessageInfo> InFly; ui64 NextId = 1; - TVector<TActorId> Hops; - TActorId FirstHop; + TVector<TActorId> Hops; + TActorId FirstHop; ui64 NumDropped = 0; std::shared_ptr<std::atomic_uint64_t> Traffic; @@ -167,7 +167,7 @@ namespace NInterconnect { Traffic = std::move(ev->Get()->Traffic); for (const ui32 nodeId : Params.NodeHops) { - const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId(); + const TActorId& actorId = nodeId ? MakeLoadResponderActorId(nodeId) : TActorId(); if (!FirstHop) { FirstHop = actorId; } else { diff --git a/library/cpp/actors/interconnect/load.h b/library/cpp/actors/interconnect/load.h index 060fa7641b..0a01a0dc04 100644 --- a/library/cpp/actors/interconnect/load.h +++ b/library/cpp/actors/interconnect/load.h @@ -5,7 +5,7 @@ namespace NInterconnect { // load responder -- lives on every node as a service actor NActors::IActor* CreateLoadResponderActor(); - NActors::TActorId MakeLoadResponderActorId(ui32 node); + NActors::TActorId MakeLoadResponderActorId(ui32 node); // load actor -- generates load with specific parameters struct TLoadParams { diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp index 1267920559..884503e602 100644 --- a/library/cpp/actors/interconnect/mock/ic_mock.cpp +++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp @@ -42,7 +42,7 @@ namespace NActors { : Key(key) {} - void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) { + void Attach(ui32 nodeId, TActorSystem *as, const TActorId& actorId) { TPeerInfo *peer = GetPeer(nodeId); auto guard = TWriteGuard(peer->Mutex); Y_VERIFY(!peer->ActorSystem); @@ -188,7 +188,7 @@ namespace NActors { , Common(std::move(common)) {} - void Registered(TActorSystem *as, const TActorId& parent) override { + void Registered(TActorSystem *as, const TActorId& parent) override { TActor::Registered(as, parent); State.Attach(NodeId, as, SelfId()); } diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 187d0b6bdf..4ba50a2b5f 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -18,7 +18,7 @@ using NActors::IEventBase; using NActors::IEventHandle; -using NActors::TActorId; +using NActors::TActorId; using NActors::TConstIoVec; using NActors::TEventSerializedData; @@ -91,8 +91,8 @@ union TTcpPacketBuf { struct TEventDescr { ui32 Type; ui32 Flags; - TActorId Recipient; - TActorId Sender; + TActorId Recipient; + TActorId Sender; ui64 Cookie; // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor NWilson::TTraceId::TSerializedTraceId TraceId; @@ -102,7 +102,7 @@ struct TEventDescr { struct TEventHolder : TNonCopyable { TEventDescr Descr; - TActorId ForwardRecipient; + TActorId ForwardRecipient; THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; ui64 Serial; diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index 8c7b61a7a7..e75cbcaef4 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -1,35 +1,35 @@ #include "poller_actor.h" #include "interconnect_common.h" -#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/core/probes.h> -#include <library/cpp/actors/protos/services_common.pb.h> +#include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/actors/util/funnel_queue.h> - + #include <util/generic/intrlist.h> #include <util/system/thread.h> #include <util/system/event.h> #include <util/system/pipe.h> #include <variant> - + namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - namespace { + namespace { int LastSocketError() { #if defined(_win_) return WSAGetLastError(); #else return errno; #endif - } - } - + } + } + struct TSocketRecord : TThrRefBase { const TIntrusivePtr<TSharedDescriptor> Socket; const TActorId ReadActorId; @@ -57,7 +57,7 @@ namespace NActors { : Socket(std::move(socket)) {} }; - + using TPollerSyncOperation = std::variant<TPollerExitThread, TPollerWakeup, TPollerUnregisterSocket>; struct TPollerSyncOperationWrapper { @@ -149,7 +149,7 @@ namespace NActors { bool DrainReadEnd() { size_t totalRead = 0; char buffer[4096]; - for (;;) { + for (;;) { ssize_t n = ReadEnd.Read(buffer, sizeof(buffer)); if (n < 0) { const int error = LastSocketError(); @@ -157,17 +157,17 @@ namespace NActors { continue; } else if (error == EAGAIN || error == EWOULDBLOCK) { break; - } else { + } else { Y_FAIL("read() failed with %s", strerror(errno)); - } + } } else { Y_VERIFY(n); totalRead += n; - } + } } return totalRead; } - + bool ProcessSyncOpQueue() { if (DrainReadEnd()) { Y_VERIFY(!SyncOperationsQ.IsEmpty()); @@ -181,25 +181,25 @@ namespace NActors { return false; // terminate the thread } else if (std::get_if<TPollerWakeup>(&op->Operation)) { op->SignalDone(); - } else { + } else { Y_FAIL(); - } + } } while (SyncOperationsQ.Pop()); - } + } return true; - } - + } + void *ThreadProc() override { SetCurrentThreadName("network poller"); while (ProcessSyncOpQueue()) { static_cast<TDerived&>(*this).ProcessEventsInLoop(); - } + } return nullptr; - } + } }; - + } // namespace NActors - + #if defined(_linux_) # include "poller_actor_linux.h" #elif defined(_darwin_) @@ -209,38 +209,38 @@ namespace NActors { #else # error "Unsupported platform" #endif - + namespace NActors { - + class TPollerToken::TImpl { std::weak_ptr<TPollerThread> Thread; TIntrusivePtr<TSocketRecord> Record; // valid only when Thread is held locked - - public: + + public: TImpl(std::shared_ptr<TPollerThread> thread, TIntrusivePtr<TSocketRecord> record) : Thread(thread) , Record(std::move(record)) - { + { thread->RegisterSocket(Record); } - + ~TImpl() { if (auto thread = Thread.lock()) { thread->UnregisterSocket(Record); - } - } - + } + } + void Request(bool read, bool write) { if (auto thread = Thread.lock()) { thread->Request(Record, read, write); - } - } + } + } const TIntrusivePtr<TSharedDescriptor>& Socket() const { return Record->Socket; } - }; - + }; + class TPollerActor: public TActorBootstrapped<TPollerActor> { // poller thread std::shared_ptr<TPollerThread> PollerThread; diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h index 5bd4f50704..f927b82089 100644 --- a/library/cpp/actors/interconnect/poller_actor.h +++ b/library/cpp/actors/interconnect/poller_actor.h @@ -55,9 +55,9 @@ namespace NActors { IActor* CreatePollerActor(); - inline TActorId MakePollerActorId() { + inline TActorId MakePollerActorId() { char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); } } diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index bbdabbd339..565a511859 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index 334859882f..e6b2bd4e4c 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -2,7 +2,7 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/interconnect/interconnect_common.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/interconnect/event_holder_pool.h> #include <atomic> diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp index d67509f058..ba2a50c6f6 100644 --- a/library/cpp/actors/interconnect/ut/large.cpp +++ b/library/cpp/actors/interconnect/ut/large.cpp @@ -14,10 +14,10 @@ Y_UNIT_TEST_SUITE(LargeMessage) { using namespace NActors; class TProducer: public TActorBootstrapped<TProducer> { - const TActorId RecipientActorId; + const TActorId RecipientActorId; public: - TProducer(const TActorId& recipientActorId) + TProducer(const TActorId& recipientActorId) : RecipientActorId(recipientActorId) {} @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) { class TConsumer : public TActorBootstrapped<TConsumer> { TManualEvent& Done; - TActorId SessionId; + TActorId SessionId; public: TConsumer(TManualEvent& done) @@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) { TManualEvent done; TConsumer* consumer = new TConsumer(done); - const TActorId recp = testCluster.RegisterActor(consumer, 1); + const TActorId recp = testCluster.RegisterActor(consumer, 1); testCluster.RegisterActor(new TProducer(recp), 2); done.WaitI(); } diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index ac46180804..2b6d27cd3f 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -70,7 +70,7 @@ public: ~TTestICCluster() { } - TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { + TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { return Nodes[nodeId]->RegisterActor(actor); } @@ -78,7 +78,7 @@ public: return Nodes[nodeId]->InterconnectProxy(peerNodeId); } - void KillActor(ui32 nodeId, const TActorId& id) { + void KillActor(ui32 nodeId, const TActorId& id) { Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); } }; diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index 59dd2554c8..ff30b1445e 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -62,7 +62,7 @@ public: setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); - const TActorId loggerActorId(0, "logger"); + const TActorId loggerActorId(0, "logger"); constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER auto loggerSettings = MakeIntrusive<NLog::TSettings>( @@ -114,7 +114,7 @@ public: ActorSystem->Stop(); } - bool Send(const TActorId& recipient, IEventBase* ev) { + bool Send(const TActorId& recipient, IEventBase* ev) { return ActorSystem->Send(recipient, ev); } @@ -127,7 +127,7 @@ public: } void RegisterServiceActor(const TActorId& serviceId, IActor* actor) { - const TActorId actorId = ActorSystem->Register(actor); + const TActorId actorId = ActorSystem->Register(actor); ActorSystem->RegisterLocalService(serviceId, actorId); } diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h index 07fe10d93a..7591200471 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_actors.h +++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h @@ -3,13 +3,13 @@ namespace NActors { class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> { protected: - const TActorId RecipientActorId; + const TActorId RecipientActorId; const ui32 Preload; ui64 SequenceNumber = 0; ui32 InFlySize = 0; public: - TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) + TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) : RecipientActorId(recipientActorId) , Preload(preload) { diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp index dbd05ce746..23d846a2fd 100644 --- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp +++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp @@ -1,38 +1,38 @@ -#include <library/cpp/actors/interconnect/poller_actor.h> -#include <library/cpp/actors/testlib/test_runtime.h> - +#include <library/cpp/actors/interconnect/poller_actor.h> +#include <library/cpp/actors/testlib/test_runtime.h> + #include <library/cpp/testing/unittest/registar.h> - -#include <util/network/pair.h> -#include <util/network/socket.h> - -using namespace NActors; - -class TTestSocket: public TSharedDescriptor { -public: - explicit TTestSocket(SOCKET fd) - : Fd_(fd) - { - } - - int GetDescriptor() override { - return Fd_; - } - -private: - SOCKET Fd_; -}; -using TTestSocketPtr = TIntrusivePtr<TTestSocket>; - -// create pair of connected, non-blocking sockets -std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() { - SOCKET fds[2]; - SocketPair(fds); - SetNonBlock(fds[0]); - SetNonBlock(fds[1]); - return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])}; -} - + +#include <util/network/pair.h> +#include <util/network/socket.h> + +using namespace NActors; + +class TTestSocket: public TSharedDescriptor { +public: + explicit TTestSocket(SOCKET fd) + : Fd_(fd) + { + } + + int GetDescriptor() override { + return Fd_; + } + +private: + SOCKET Fd_; +}; +using TTestSocketPtr = TIntrusivePtr<TTestSocket>; + +// create pair of connected, non-blocking sockets +std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() { + SOCKET fds[2]; + SocketPair(fds); + SetNonBlock(fds[0]); + SetNonBlock(fds[1]); + return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])}; +} + std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() { // create server (listening) socket SOCKET server = socket(AF_INET, SOCK_STREAM, 0); @@ -74,101 +74,101 @@ std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() { return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted)); } -class TPollerActorTest: public TTestBase { - UNIT_TEST_SUITE(TPollerActorTest); - UNIT_TEST(Registration) - UNIT_TEST(ReadNotification) - UNIT_TEST(WriteNotification) - UNIT_TEST(HangupNotification) - UNIT_TEST_SUITE_END(); - -public: - void SetUp() override { - ActorSystem_ = MakeHolder<TTestActorRuntimeBase>(); - ActorSystem_->Initialize(); - - PollerId_ = ActorSystem_->Register(CreatePollerActor()); - - TDispatchOptions opts; - opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); - ActorSystem_->DispatchEvents(opts); - } - - void Registration() { - auto [s1, s2] = NonBlockSockets(); - auto readerId = ActorSystem_->AllocateEdgeActor(); - auto writerId = ActorSystem_->AllocateEdgeActor(); - +class TPollerActorTest: public TTestBase { + UNIT_TEST_SUITE(TPollerActorTest); + UNIT_TEST(Registration) + UNIT_TEST(ReadNotification) + UNIT_TEST(WriteNotification) + UNIT_TEST(HangupNotification) + UNIT_TEST_SUITE_END(); + +public: + void SetUp() override { + ActorSystem_ = MakeHolder<TTestActorRuntimeBase>(); + ActorSystem_->Initialize(); + + PollerId_ = ActorSystem_->Register(CreatePollerActor()); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); + ActorSystem_->DispatchEvents(opts); + } + + void Registration() { + auto [s1, s2] = NonBlockSockets(); + auto readerId = ActorSystem_->AllocateEdgeActor(); + auto writerId = ActorSystem_->AllocateEdgeActor(); + RegisterSocket(s1, readerId, writerId); - - // reader should receive event after socket registration + + // reader should receive event after socket registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId); token = ev->Get()->PollerToken; - } - - // writer should receive event after socket registration - { + } + + // writer should receive event after socket registration + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId); UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken); - } - } - - void ReadNotification() { - auto [r, w] = NonBlockSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + } + } + + void ReadNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); RegisterSocket(r, clientId, {}); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - - char buf; - - // data not ready yet for read - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); - UNIT_ASSERT(errno == EWOULDBLOCK); - + } + + char buf; + + // data not ready yet for read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + // request read poll token->Request(true, false); - // write data - UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1); - - // notification after socket become readable - { + // write data + UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1); + + // notification after socket become readable + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); UNIT_ASSERT(ev->Get()->Read); UNIT_ASSERT(!ev->Get()->Write); - } - - // read data - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1); - UNIT_ASSERT_EQUAL('x', buf); - - // no more data to read - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); - UNIT_ASSERT(errno == EWOULDBLOCK); - } - - void WriteNotification() { + } + + // read data + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1); + UNIT_ASSERT_EQUAL('x', buf); + + // no more data to read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + } + + void WriteNotification() { auto [r, w] = TcpSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + auto clientId = ActorSystem_->AllocateEdgeActor(); SetNonBlock(w->GetDescriptor()); RegisterSocket(w, TActorId{}, clientId); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - + } + char buffer[4096]; memset(buffer, 'x', sizeof(buffer)); @@ -181,7 +181,7 @@ public: written += res; } else if (res == 0) { UNIT_FAIL("unexpected zero return from send()"); - } else { + } else { UNIT_ASSERT(res == -1); if (errno == EINTR) { continue; @@ -191,10 +191,10 @@ public: } else { UNIT_FAIL("unexpected error from send()"); } - } - } + } + } Cerr << "written " << written << " bytes" << Endl; - + // read all written data from the read end for (;;) { char buffer[4096]; @@ -216,7 +216,7 @@ public: } } } - + // wait for notification after socket becomes writable again { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); @@ -224,41 +224,41 @@ public: UNIT_ASSERT(!ev->Get()->Read); UNIT_ASSERT(ev->Get()->Write); } - } - } - - void HangupNotification() { - auto [r, w] = NonBlockSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + } + } + + void HangupNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); RegisterSocket(r, clientId, TActorId{}); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - + } + token->Request(true, false); ShutDown(w->GetDescriptor(), SHUT_RDWR); - + // notification after peer shuts down its socket - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); UNIT_ASSERT(ev->Get()->Read); - } - } - -private: + } + } + +private: void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) { auto ev = new TEvPollerRegister{socket, readActorId, writeActorId}; - ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); - } - -private: - THolder<TTestActorRuntimeBase> ActorSystem_; - TActorId PollerId_; -}; - -UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest); + ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); + } + +private: + THolder<TTestActorRuntimeBase> ActorSystem_; + TActorId PollerId_; +}; + +UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest); diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make index ec19f1a64a..2f5b13352e 100644 --- a/library/cpp/actors/interconnect/ut/ya.make +++ b/library/cpp/actors/interconnect/ut/ya.make @@ -15,11 +15,11 @@ ELSE() ENDIF() SRCS( - channel_scheduler_ut.cpp + channel_scheduler_ut.cpp event_holder_pool_ut.cpp interconnect_ut.cpp large.cpp - poller_actor_ut.cpp + poller_actor_ut.cpp dynamic_proxy_ut.cpp ) @@ -28,7 +28,7 @@ PEERDIR( library/cpp/actors/interconnect library/cpp/actors/interconnect/ut/lib library/cpp/actors/interconnect/ut/protos - library/cpp/actors/testlib + library/cpp/actors/testlib library/cpp/digest/md5 library/cpp/testing/unittest ) diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp index 69374cd080..5d19bc3003 100644 --- a/library/cpp/actors/interconnect/ut_fat/main.cpp +++ b/library/cpp/actors/interconnect/ut_fat/main.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { ui16 SendFlags; public: - TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) + TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) : TSenderBaseActor(recipientActorId, 32) , SendFlags(sendFlags) { @@ -108,7 +108,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); - const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); TSenderActor* senderActor = new TSenderActor(recipient, flags); testCluster.RegisterActor(senderActor, 1); @@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); - const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); TSenderActor* senderActor = new TSenderActor(recipient, flags); testCluster.RegisterActor(senderActor, 1); diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make index 9e4fb46fdb..60d29b0fc0 100644 --- a/library/cpp/actors/interconnect/ya.make +++ b/library/cpp/actors/interconnect/ya.make @@ -75,18 +75,18 @@ PEERDIR( contrib/libs/libc_compat contrib/libs/openssl library/cpp/actors/core - library/cpp/actors/dnscachelib + library/cpp/actors/dnscachelib library/cpp/actors/dnsresolver library/cpp/actors/helpers library/cpp/actors/prof library/cpp/actors/protos library/cpp/actors/util library/cpp/digest/crc32c - library/cpp/json + library/cpp/json library/cpp/lwtrace - library/cpp/monlib/dynamic_counters + library/cpp/monlib/dynamic_counters library/cpp/monlib/metrics - library/cpp/monlib/service/pages/tablesorter + library/cpp/monlib/service/pages/tablesorter library/cpp/openssl/init library/cpp/packedtypes ) diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto index 8155535f1f..5fbd6d44ee 100644 --- a/library/cpp/actors/protos/actors.proto +++ b/library/cpp/actors/protos/actors.proto @@ -2,12 +2,12 @@ package NActorsProto; option java_package = "ru.yandex.kikimr.proto"; option java_outer_classname = "NActorsBaseProto"; -message TActorId { +message TActorId { required fixed64 RawX1 = 1; required fixed64 RawX2 = 2; } message TCallbackException { - required TActorId ActorId = 1; + required TActorId ActorId = 1; required string ExceptionMessage = 2; } diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 8656c8ea1f..2e3b0d0d15 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -51,7 +51,7 @@ message THandshakeRequest { required uint64 Serial = 4; required uint32 ReceiverNodeId = 5; - required string SenderActorId = 6; + required string SenderActorId = 6; optional string SenderHostName = 7; optional string ReceiverHostName = 8; @@ -81,7 +81,7 @@ message THandshakeSuccess { required uint64 ProgramStartTime = 3; required uint64 Serial = 4; - required string SenderActorId = 5; + required string SenderActorId = 5; optional string VersionTag = 6; repeated string AcceptedVersionTags = 7; @@ -104,7 +104,7 @@ message THandshakeReply { message TEvLoadMessage { message THop { - optional NActorsProto.TActorId NextHop = 1; // if zero, then the payload is trimmed out of the message + optional NActorsProto.TActorId NextHop = 1; // if zero, then the payload is trimmed out of the message } repeated THop Hops = 1; // the route for the message diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index cda6980b1e..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -295,17 +295,17 @@ namespace NActors { void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(deadline, ev, cookie, workerId); - } - + } + void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId); } void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { - TInstant deadline = Runtime->GetTimeProvider()->Now() + delay; + TInstant deadline = Runtime->GetTimeProvider()->Now() + delay; DoSchedule(deadline, ev, cookie, workerId); - } - + } + void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) { Y_UNUSED(workerId); @@ -319,13 +319,13 @@ namespace NActors { Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); } - - auto now = Runtime->GetTimeProvider()->Now(); - if (deadline < now) { - deadline = now; // avoid going backwards in time - } - TDuration delay = (deadline - now); - + + auto now = Runtime->GetTimeProvider()->Now(); + if (deadline < now) { + deadline = now; // avoid going backwards in time + } + TDuration delay = (deadline - now); + if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) { ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); @@ -336,9 +336,9 @@ namespace NActors { if (cookie) { cookie->Detach(); } - if (verbose) { + if (verbose) { Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n"; - } + } } } @@ -366,8 +366,8 @@ namespace NActors { ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { - const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); - TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); + const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); + TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); if (ev->GetRecipientRewrite() == logger) { TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); @@ -403,7 +403,7 @@ namespace NActors { } TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, - const TActorId& parentId) override { + const TActorId& parentId) override { return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId); } @@ -486,7 +486,7 @@ namespace NActors { } void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { - const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); + const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); node->LogSettings->SetAllowDrop(false); @@ -579,7 +579,7 @@ namespace NActors { } - void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { + void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); runtime.ScheduleWhiteListParent[actorId] = parentId; @@ -640,7 +640,7 @@ namespace NActors { TInstant time = scheduledEvents.begin()->Deadline; while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) { - static THashMap<std::pair<TActorId, TString>, ui64> eventTypes; + static THashMap<std::pair<TActorId, TString>, ui64> eventTypes; auto& item = *scheduledEvents.begin(); TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type); eventTypes[std::make_pair(item.Event->Recipient, name)]++; @@ -725,7 +725,7 @@ namespace NActors { VERBOSE = verbose; } - void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { + void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { Y_VERIFY(!IsInitialized); Y_VERIFY(nodeIndex < NodeCount); auto node = Nodes[nodeIndex + FirstNodeId]; @@ -857,8 +857,8 @@ namespace NActors { return (*TmpDir)(); } - TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType, - ui64 revolvingCounter, const TActorId& parentId) { + TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType, + ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); @@ -897,7 +897,7 @@ namespace NActors { mailbox->AttachActor(localActorId, actor); // do init - const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); + const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); @@ -925,8 +925,8 @@ namespace NActors { return actorId; } - TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, - const TActorId& parentId) { + TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, + const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); @@ -941,7 +941,7 @@ namespace NActors { } mailbox->AttachActor(localActorId, actor); - const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); + const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); @@ -949,7 +949,7 @@ namespace NActors { return actorId; } - TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { + TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); @@ -959,13 +959,13 @@ namespace NActors { node->ActorToActorId[actor] = actorId; } - return node->ActorSystem->RegisterLocalService(serviceId, actorId); + return node->ActorSystem->RegisterLocalService(serviceId, actorId); } - TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) { + TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); + TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); EdgeActors.insert(edgeActor); EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor; return edgeActor; @@ -1414,14 +1414,14 @@ namespace NActors { return it->second; } - TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { + TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); return node->ActorSystem->LookupLocalService(serviceId); } - void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) { + void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) { TGuard<TMutex> guard(Mutex); ui32 dispatchCount = 0; if (!edgeFilter.empty()) { @@ -1429,7 +1429,7 @@ namespace NActors { Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data()); } } - const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; + const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout; for (;;) { for (auto edgeActor : edgeActors) { @@ -1460,7 +1460,7 @@ namespace NActors { } } - TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) { + TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); @@ -1469,7 +1469,7 @@ namespace NActors { return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); } - void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) { + void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) { TGuard<TMutex> guard(Mutex); BlockedOutput.insert(actorId); } @@ -1480,7 +1480,7 @@ namespace NActors { DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); } - IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { TGuard<TMutex> guard(Mutex); if (nodeIndex == Max<ui32>()) { Y_VERIFY(actorId.NodeId()); @@ -1494,7 +1494,7 @@ namespace NActors { return FindActor(actorId, node); } - void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { + void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { TGuard<TMutex> guard(Mutex); if (allow) { if (VERBOSE) { @@ -1509,7 +1509,7 @@ namespace NActors { } } - bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const { + bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const { TGuard<TMutex> guard(Mutex); return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); } @@ -1570,7 +1570,7 @@ namespace NActors { IActor* recipientActor = mailbox->FindActor(recipientLocalId); if (recipientActor) { // Save actorId by value in order to prevent ctx from being invalidated during another Send call. - TActorId actorId = ev->GetRecipientRewrite(); + TActorId actorId = ev->GetRecipientRewrite(); node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId); TActivationContext *prevTlsActivationContext = TlsActivationContext; @@ -1585,7 +1585,7 @@ namespace NActors { recipientActor->Receive(evHolder, ctx); node->ExecutorThread->DropUnregistered(); } - CurrentRecipient = TActorId(); + CurrentRecipient = TActorId(); TlsActivationContext = prevTlsActivationContext; } else { if (VERBOSE) { @@ -1599,7 +1599,7 @@ namespace NActors { } } - IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const { + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const { ui32 mailboxHint = actorId.Hint(); ui64 localId = actorId.LocalId(); TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); @@ -1644,7 +1644,7 @@ namespace NActors { setup->LocalServices = node->LocalServices; setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); - const TActorId nameserviceId = GetNameserviceActorId(); + const TActorId nameserviceId = GetNameserviceActorId(); TIntrusivePtr<TInterconnectProxyCommon> common; common.Reset(new TInterconnectProxyCommon); @@ -1688,7 +1688,7 @@ namespace NActors { NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); - std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); + std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } @@ -1732,7 +1732,7 @@ namespace NActors { Mailboxes.erase(mboxId); } - TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const { + TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const { auto it = ActorNames.find(actorId); if (it != ActorNames.end()) return it->second; @@ -1773,7 +1773,7 @@ namespace NActors { return TEST_ACTOR_RUNTIME; } - TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, + TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) : Delegatee(delegatee) @@ -1813,7 +1813,7 @@ namespace NActors { STFUNC(Reply) { Y_VERIFY(!HasReply); IEventHandle *requestEv = Context->Queue->Head(); - TActorId originalSender = requestEv->Sender; + TActorId originalSender = requestEv->Sender; HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get()); if (HasReply) { delete Context->Queue->Pop(); @@ -1857,11 +1857,11 @@ namespace NActors { return forwardedEv; } private: - const TActorId Delegatee; + const TActorId Delegatee; const bool IsSync; - const TVector<TActorId> AdditionalActors; + const TVector<TActorId> AdditionalActors; TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; - TActorId ReplyId; + TActorId ReplyId; bool HasReply; TDispatchOptions DelegateeOptions; TTestActorRuntimeBase* Runtime; @@ -1882,7 +1882,7 @@ namespace NActors { { } - IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override { + IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override { return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime, CreateReplyChecker); } diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index b7e8edd1c5..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -199,7 +199,7 @@ namespace NActors { typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; - typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; + typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; TTestActorRuntimeBase(THeSingleSystemEnv); @@ -213,7 +213,7 @@ namespace NActors { static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); - static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); + static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); TEventObserver SetObserverFunc(TEventObserver observerFunc); TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); TEventFilter SetEventFilter(TEventFilter filterFunc); @@ -232,20 +232,20 @@ namespace NActors { TInstant GetCurrentTime() const; void UpdateCurrentTime(TInstant newTime); void AdvanceCurrentTime(TDuration duration); - void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); + void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); virtual void Initialize(); ui32 GetNodeId(ui32 index = 0) const; ui32 GetNodeCount() const; ui64 AllocateLocalId(); ui32 InterconnectPoolId() const; TString GetTempDir(); - TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0, + TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0, TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, - const TActorId& parentid = TActorId()); - TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, - const TActorId& parentid = TActorId()); - TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0); - TActorId AllocateEdgeActor(ui32 nodeIndex = 0); + const TActorId& parentid = TActorId()); + TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, + const TActorId& parentid = TActorId()); + TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0); + TActorId AllocateEdgeActor(ui32 nodeIndex = 0); TEventsList CaptureEvents(); TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId); TScheduledEventsList CaptureScheduledEvents(); @@ -260,13 +260,13 @@ namespace NActors { void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0); void ClearCounters(); ui64 GetCounter(ui32 evType) const; - TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0); - void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max()); - TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo); - void BlockOutputForActor(const TActorId& actorId); - IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const; - void EnableScheduleForActor(const TActorId& actorId, bool allow = true); - bool IsScheduleForActorEnabled(const TActorId& actorId) const; + TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0); + void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max()); + TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo); + void BlockOutputForActor(const TActorId& actorId); + IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const; + void EnableScheduleForActor(const TActorId& actorId, bool allow = true); + bool IsScheduleForActorEnabled(const TActorId& actorId) const; TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); void SetupMonitoring(); @@ -317,7 +317,7 @@ namespace NActors { template<class TEvent> typename TEvent::TPtr GrabEdgeEventIf( - const TSet<TActorId>& edgeFilter, + const TSet<TActorId>& edgeFilter, const std::function<bool(const typename TEvent::TPtr&)>& predicate, TDuration simTimeout = TDuration::Max()) { @@ -345,11 +345,11 @@ namespace NActors { template<class TEvent> typename TEvent::TPtr GrabEdgeEventIf( - const TActorId& edgeActor, + const TActorId& edgeActor, const std::function<bool(const typename TEvent::TPtr&)>& predicate, TDuration simTimeout = TDuration::Max()) { - TSet<TActorId> edgeFilter{edgeActor}; + TSet<TActorId> edgeFilter{edgeActor}; return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout); } @@ -368,13 +368,13 @@ namespace NActors { } template<class TEvent> - typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { + typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { return GrabEdgeEventIf<TEvent>(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout); } template<class TEvent> - typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { - TSet<TActorId> edgeFilter{edgeActor}; + typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { + TSet<TActorId> edgeFilter{edgeActor}; return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); } @@ -409,7 +409,7 @@ namespace NActors { } template<class TEvent> - typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { + typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { try { return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); } catch (...) { @@ -418,7 +418,7 @@ namespace NActors { } template<class TEvent> - typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { + typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { try { return GrabEdgeEvent<TEvent>(edgeActor, simTimeout); } catch (...) { @@ -462,7 +462,7 @@ namespace NActors { } void SetDispatcherRandomSeed(TInstant time, ui64 iteration); - TString GetActorName(const TActorId& actorId) const; + TString GetActorName(const TActorId& actorId) const; const TVector<ui64>& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; } void SetTxAllocatorTabletIds(const TVector<ui64>& ids) { TxAllocatorTabletIds = ids; } @@ -493,7 +493,7 @@ namespace NActors { } private: - IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; + IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); void ClearMailbox(ui32 nodeId, ui32 hint); @@ -526,7 +526,7 @@ namespace NActors { ui64 DispatchCyclesCount; ui64 DispatchedEventsCount; ui64 DispatchedEventsLimit = 2'500'000; - TActorId CurrentRecipient; + TActorId CurrentRecipient; ui64 DispatcherRandomSeed; TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; TAutoPtr<TLogBackend> LogBackend; @@ -559,9 +559,9 @@ namespace NActors { TIntrusivePtr<NInterconnect::TPollerThreads> Poller; volatile ui64* ActorSystemTimestamp; volatile ui64* ActorSystemMonotonic; - TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices; - TMap<TActorId, IActor*> LocalServicesActors; - TMap<IActor*, TActorId> ActorToActorId; + TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices; + TMap<TActorId, IActor*> LocalServicesActors; + TMap<IActor*, TActorId> ActorToActorId; THolder<TMailboxTable> MailboxTable; std::shared_ptr<void> AppData0; THolder<TActorSystem> ActorSystem; @@ -613,8 +613,8 @@ namespace NActors { TProgramShouldContinue ShouldContinue; TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; ui64 CurrentTimestamp; - TSet<TActorId> EdgeActors; - THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox; + TSet<TActorId> EdgeActors; + THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox; TDuration DispatchTimeout; TDuration ReschedulingDelay; TEventObserver ObserverFunc; @@ -622,10 +622,10 @@ namespace NActors { TEventFilter EventFilterFunc; TScheduledEventFilter ScheduledEventFilterFunc; TRegistrationObserver RegistrationObserver; - TSet<TActorId> BlockedOutput; - TSet<TActorId> ScheduleWhiteList; - THashMap<TActorId, TActorId> ScheduleWhiteListParent; - THashMap<TActorId, TString> ActorNames; + TSet<TActorId> BlockedOutput; + TSet<TActorId> ScheduleWhiteList; + THashMap<TActorId, TActorId> ScheduleWhiteListParent; + THashMap<TActorId, TString> ActorNames; TDispatchContext* CurrentDispatchContext; TVector<ui64> TxAllocatorTabletIds; @@ -686,7 +686,7 @@ namespace NActors { class IStrandingDecoratorFactory { public: virtual ~IStrandingDecoratorFactory() {} - virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0; + virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0; }; struct IReplyChecker { |