diff options
| author | xenoxeno <[email protected]> | 2023-03-09 12:10:01 +0300 |
|---|---|---|
| committer | xenoxeno <[email protected]> | 2023-03-09 12:10:01 +0300 |
| commit | ad607bb887619f321dec03b02df8220e01b7f5aa (patch) | |
| tree | 7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/core | |
| parent | 6324d075a5e80b6943b5de6b465b775050fe83df (diff) | |
light events for actor system
Diffstat (limited to 'library/cpp/actors/core')
23 files changed, 1241 insertions, 199 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index 00eef387ea4..304f7405aea 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -16,6 +16,22 @@ namespace NActors { return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId)); } + bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept { + return SelfActorId.Send(recipient, ev); + } + + bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept { + return SelfActorId.Send(recipient, ev, flags); + } + + bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept { + return SelfActorId.Send(recipient, ev, flags, cookie); + } + + bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept { + return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId)); + } + void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); } @@ -29,15 +45,15 @@ namespace NActors { } void TActorIdentity::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie); + return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie); } void TActorIdentity::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie); + return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie); } void TActorIdentity::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const { - return TActivationContext::Schedule(delta, new IEventHandle(*this, {}, ev), cookie); + return TActivationContext::Schedule(delta, new IEventHandleFat(*this, {}, ev), cookie); } TActorId TActivationContext::RegisterWithSameMailbox(IActor* actor, TActorId parentId) { @@ -75,27 +91,27 @@ namespace NActors { } void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); + ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie); } void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); + ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie); } void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie); + ExecutorThread.Schedule(delta, new IEventHandleFat(SelfID, TActorId(), ev), cookie); } void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); + TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie); } void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); + TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie); } void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie); + TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie); } TInstant TActivationContext::Now() { @@ -145,7 +161,7 @@ namespace NActors { (actor->*StateFunc)(ev, TActivationContext::AsActorContext()); } - void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandle> ev) { + void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev) { Y_VERIFY(!!ev && ev->GetBase()); ev->GetBase()->Execute(actor, std::move(ev)); } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 0e109da8192..08a3beb2bfc 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -59,6 +59,12 @@ namespace NActors { template <ESendingType SendingType = ESendingType::Common> static bool Send(std::unique_ptr<IEventHandle> &&ev); + template <ESendingType SendingType = ESendingType::Common> + static bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient); + + template <ESendingType SendingType = ESendingType::Common> + static bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient); + /** * Schedule one-shot event that will be send at given time point in the future. * @@ -132,10 +138,16 @@ namespace NActors { template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; template <ESendingType SendingType = ESendingType::Common> + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; + template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId)); } template <ESendingType SendingType = ESendingType::Common> + bool Send(const TActorId& recipient, THolder<IEventHandleLight> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { + return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId)); + } + template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId)); } @@ -145,6 +157,15 @@ namespace NActors { bool Send(std::unique_ptr<IEventHandle> &&ev) const { return Send<SendingType>(TAutoPtr<IEventHandle>(ev.release())); } + template <ESendingType SendingType = ESendingType::Common> + bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const; + template <ESendingType SendingType = ESendingType::Common> + bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const; + template <ESendingType SendingType = ESendingType::Common, typename TEventHandle> + bool Forward(TAutoPtr<TEventHandle>& ev, const TActorId& recipient) const { + TAutoPtr<IEventHandle> evi(ev.Release()); + return Forward(evi, recipient); + } TInstant Now() const; TMonotonic Monotonic() const; @@ -206,6 +227,15 @@ namespace NActors { template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; + template <ESendingType SendingType = ESendingType::Common> + bool Send(const TActorId& recipient, IEventHandleLight* ev) const; + template <ESendingType SendingType = ESendingType::Common> + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const; + template <ESendingType SendingType = ESendingType::Common> + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const; + template <ESendingType SendingType = ESendingType::Common> + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const; + bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; @@ -253,7 +283,7 @@ namespace NActors { class TActorVirtualBehaviour { public: - static void Receive(IActor* actor, std::unique_ptr<IEventHandle> ev); + static void Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev); public: }; @@ -262,8 +292,7 @@ namespace NActors { using TBase = IActor; friend class TDecorator; public: - typedef void (IActor::* TReceiveFunc)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); - + using TReceiveFunc = void (IActor::*)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); private: TReceiveFunc StateFunc = nullptr; public: @@ -303,6 +332,8 @@ namespace NActors { friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&); friend class TDecorator; protected: + using TReceiveFuncLight = void (IActor::*)(TAutoPtr<IEventHandle>& ev); + TReceiveFuncLight StateFuncLight = nullptr; TActorCallbackBehaviour CImpl; public: using TReceiveFunc = TActorCallbackBehaviour::TReceiveFunc; @@ -476,19 +507,28 @@ namespace NActors { TActorIdentity SelfId() const { return SelfActorId; } - void Receive(TAutoPtr<IEventHandle>& ev, const TActorContext& /*ctx*/) { + // void Receive(TAutoPtr<IEventHandle> ev, const TActorContext& /*ctx*/) { + // Receive(ev); + // } + void Receive(TAutoPtr<IEventHandle>& ev) { ++HandledEvents; - if (CImpl.Initialized()) { + if (StateFuncLight) { + (this->*StateFuncLight)(ev); + } else if (CImpl.Initialized()) { CImpl.Receive(this, ev); } else { - TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandle>(ev.Release())); + TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandleFat>(IEventHandleFat::MakeFat(ev).Release())); } } protected: void Describe(IOutputStream&) const noexcept override; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; - bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { + bool Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept; + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept; + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept; + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept; + bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{ return Send(recipient, ev.Release(), flags, cookie, std::move(traceId)); } bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { @@ -511,6 +551,20 @@ namespace NActors { return Send(recipient, ev.release(), flags, cookie, std::move(traceId)); } + bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) { + return TActivationContext::Forward(ev, recipient); + } + + bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) { + return TActivationContext::Forward(ev, recipient); + } + + template <typename TEventHandle> + bool Forward(TAutoPtr<TEventHandle>& ev, const TActorId& recipient) const { + TAutoPtr<IEventHandle> evi(ev.Release()); + return Forward(evi, recipient); + } + void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; @@ -574,8 +628,78 @@ namespace NActors { } }; + template<typename TDerived> + class TActorCallback : public IActorCallback { + public: + using TFatReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&, const TActorContext&); + using TLightReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&); + + private: + void BecomeFat(TFatReceiveFunc stateFunc) { + IActorCallback::Become(stateFunc); + } + + template<typename... TArgs> + void BecomeFat(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) { + IActorCallback::Become(stateFunc, ctx, std::forward<TArgs>(args)...); + } + + template<typename... TArgs> + void BecomeFat(TFatReceiveFunc stateFunc, TArgs&&... args) { + IActorCallback::Become(stateFunc, std::forward<TArgs>(args)...); + } + + void BecomeLight(TLightReceiveFunc stateFuncLight) { + StateFuncLight = static_cast<TReceiveFuncLight>(stateFuncLight); + } + + public: + TActorCallback(TFatReceiveFunc stateFunc, ui32 activityType = OTHER) + : IActorCallback(static_cast<TReceiveFunc>(stateFunc), activityType) + { + } + + TActorCallback(TLightReceiveFunc stateFunc, ui32 activityType = OTHER) + : IActorCallback(nullptr, activityType) + { + Become(stateFunc); + } + + template<typename T> + void Become(T stateFunc) { + IActorCallback::Become(stateFunc); + } + + template<typename T, typename... TArgs> + void Become(T stateFunc, const TActorContext&, TArgs&&... args) { + IActorCallback::Become(stateFunc); + Schedule(std::forward<TArgs>(args)...); + } + + template<typename T, typename... TArgs> + void Become(T stateFunc, TArgs&&... args) { + IActorCallback::Become(stateFunc); + Schedule(std::forward<TArgs>(args)...); + } + + template<> + void Become<TLightReceiveFunc>(TLightReceiveFunc stateFunc) { + BecomeLight(stateFunc); + } + + template<> + void Become<TFatReceiveFunc>(TFatReceiveFunc stateFunc) { + BecomeFat(stateFunc); + } + + template<typename... TArgs> + void Become(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) { + BecomeFat(stateFunc, ctx, std::forward<TArgs>(args)...); + } + }; + template <typename TDerived> - class TActor: public IActorCallback { + class TActor: public TActorCallback<TDerived> { private: template <typename T, typename = const char*> struct HasActorName: std::false_type { }; @@ -601,14 +725,20 @@ namespace NActors { protected: //* Comment this function to find unmarked activities static constexpr IActor::EActivityType ActorActivityType() { - return EActorActivity::OTHER; + return IActorCallback::EActorActivity::OTHER; } //*/ // static constexpr char ActorName[] = "UNNAMED"; - TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx), ui32 activityType = GetActivityTypeIndex()) - : IActorCallback(static_cast<TReceiveFunc>(func), activityType) - { } + TActor(typename TActorCallback<TDerived>::TFatReceiveFunc func, ui32 activityType = GetActivityTypeIndex()) + : TActorCallback<TDerived>(func, activityType) + { + } + + TActor(typename TActorCallback<TDerived>::TLightReceiveFunc func, ui32 activityType = GetActivityTypeIndex()) + : TActorCallback<TDerived>(func, activityType) + { + } public: typedef TDerived TThis; @@ -617,8 +747,10 @@ namespace NActors { #define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx #define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev +#define LIGHTFN_SIG TAutoPtr<::NActors::IEventHandle>& ev #define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx) #define STATEFN(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ) +#define LIGHTFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev) #define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype); @@ -630,13 +762,26 @@ namespace NActors { UNHANDLED_MSG_HANDLER \ } +#define LIGHTFN_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \ + switch (const ui32 etype = ev->GetTypeRewrite()) { \ + HANDLERS \ + default: \ + UNHANDLED_MSG_HANDLER \ + } + #define STRICT_STFUNC_BODY(HANDLERS) STFUNC_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER) +#define STRICT_LIGHTFN_BODY(HANDLERS) LIGHTFN_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER) #define STRICT_STFUNC(NAME, HANDLERS) \ void NAME(STFUNC_SIG) { \ STRICT_STFUNC_BODY(HANDLERS) \ } +#define STRICT_LIGHTFN(NAME, HANDLERS) \ + void NAME(LIGHTFN_SIG) { \ + STRICT_LIGHTFN_BODY(HANDLERS) \ + } + #define STRICT_STFUNC_EXC(NAME, HANDLERS, EXCEPTION_HANDLERS) \ void NAME(STFUNC_SIG) { \ try { \ @@ -681,7 +826,7 @@ namespace NActors { STFUNC(State) { if (DoBeforeReceiving(ev, ctx)) { - Actor->Receive(ev, ctx); + Actor->Receive(ev); DoAfterReceiving(ctx); } } @@ -777,8 +922,26 @@ namespace NActors { } template <ESendingType SendingType> + bool TActivationContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) { + IEventHandle::Forward(ev, recipient); + return Send(ev); + } + + template <ESendingType SendingType> + bool TActivationContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) { + IEventHandle::Forward(ev, recipient); + return Send(ev); + } + + template <ESendingType SendingType> bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); + return Send<SendingType>(new IEventHandleFat(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); + } + + template <ESendingType SendingType> + bool TActorContext::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { + ev->PrepareSend(recipient, SelfID, flags, cookie, std::move(traceId)); + return Send<SendingType>(ev); } template <ESendingType SendingType> @@ -787,6 +950,18 @@ namespace NActors { } template <ESendingType SendingType> + bool TActorContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const { + IEventHandle::Forward(ev, recipient); + return ExecutorThread.Send<SendingType>(ev); + } + + template <ESendingType SendingType> + bool TActorContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const { + IEventHandle::Forward(ev, recipient); + return ExecutorThread.Send<SendingType>(ev); + } + + template <ESendingType SendingType> TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) { return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, parentId); } @@ -798,7 +973,31 @@ namespace NActors { template <ESendingType SendingType> bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); + return TActivationContext::Send<SendingType>(new IEventHandleFat(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); + } + + template <ESendingType SendingType> + bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev) const { + ev->PrepareSend(recipient, *this); + return TActivationContext::Send<SendingType>(ev); + } + + template <ESendingType SendingType> + bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const { + ev->PrepareSend(recipient, *this, flags); + return TActivationContext::Send<SendingType>(ev); + } + + template <ESendingType SendingType> + bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const { + ev->PrepareSend(recipient, *this, flags, cookie); + return TActivationContext::Send<SendingType>(ev); + } + + template <ESendingType SendingType> + bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { + ev->PrepareSend(recipient, *this, flags, cookie, std::move(traceId)); + return TActivationContext::Send<SendingType>(ev); } template <ESendingType SendingType> diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h index 9d89afcf709..37286d48e28 100644 --- a/library/cpp/actors/core/actor_bootstrapped.h +++ b/library/cpp/actors/core/actor_bootstrapped.h @@ -12,7 +12,7 @@ namespace NActors { class TActorBootstrapped: public TActor<TDerived> { protected: TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { - return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0); + return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0); } STFUNC(StateBootstrap) { diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index 48c6f4d2d0a..6b55228bd75 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -44,7 +44,7 @@ namespace NActors { THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TInstant deadline) { const ui64 cookie = ++WaitCookie; if (deadline != TInstant::Max()) { - TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie)); + TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie)); } // ensure we have no unprocessed event and return back to actor system to receive one diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index fc9ddeef1f8..9fca8598f18 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -115,6 +115,11 @@ namespace NActors { bool Send(TAutoPtr<IEventHandle> ev); + bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) { + IEventHandle::Forward(ev, recipient); + return Send(ev.Release()); + } + void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) { return GetActorContext().Schedule(delta, ev, cookie); } @@ -157,7 +162,7 @@ namespace NActors { {} TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override { - return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0); + return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0); } private: diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index 86b19af546e..fe492e531e3 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { ctx.RegisterWithSameMailbox(new TDummyActor()); } if (Role == ERole::Leader) { - TAutoPtr<IEventHandle> ev = new IEventHandle(Receiver, SelfId(), new TEvents::TEvPing()); + TAutoPtr<IEventHandle> ev = new IEventHandleFat(Receiver, SelfId(), new TEvents::TEvPing()); SpecialSend(ev, ctx); } } @@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } if (AllocatesMemory) { - SpecialSend(new IEventHandle(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx); + SpecialSend(new IEventHandleFat(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx); } else { std::swap(*const_cast<TActorId*>(&ev->Sender), *const_cast<TActorId*>(&ev->Recipient)); ev->DropRewrite(); @@ -541,14 +541,14 @@ Y_UNIT_TEST_SUITE(TestDecorator) { { } - bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) override { + bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { *Counter += 1; if (ev->Type != TEvents::THelloWorld::Pong) { - TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing()); + TAutoPtr<IEventHandle> pingEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPing()); SavedEvent = ev; - Actor->Receive(pingEv, ctx); + Actor->Receive(pingEv); } else { - Actor->Receive(SavedEvent, ctx); + Actor->Receive(SavedEvent); } return false; } @@ -566,7 +566,7 @@ Y_UNIT_TEST_SUITE(TestDecorator) { bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { *Counter += 1; if (ev->Type == TEvents::THelloWorld::Ping) { - TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong()); + TAutoPtr<IEventHandle> pongEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPong()); Send(SelfId(), new TEvents::TEvPong()); return false; } @@ -701,7 +701,7 @@ Y_UNIT_TEST_SUITE(TestStateFunc) { auto sender = runtime.AllocateEdgeActor(); auto testActor = runtime.Register(new TTestActorWithExceptionsStateFunc()); for (ui64 tag = 0; tag < 4; ++tag) { - runtime.Send(new IEventHandle(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true); + runtime.Send(new IEventHandleFat(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender); UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Tag, tag); } diff --git a/library/cpp/actors/core/actor_virtual.h b/library/cpp/actors/core/actor_virtual.h index c9c34c4729d..1306252b58d 100644 --- a/library/cpp/actors/core/actor_virtual.h +++ b/library/cpp/actors/core/actor_virtual.h @@ -8,7 +8,7 @@ template <class TEvent> class TEventContext { private: TEvent* Event; - std::unique_ptr<IEventHandle> Handle; + std::unique_ptr<IEventHandleFat> Handle; public: const TEvent* operator->() const { return Event; @@ -16,7 +16,7 @@ public: const IEventHandle& GetHandle() const { return *Handle; } - TEventContext(std::unique_ptr<IEventHandle> handle) + TEventContext(std::unique_ptr<IEventHandleFat> handle) : Handle(std::move(handle)) { Y_VERIFY_DEBUG(dynamic_cast<TEvent*>(Handle->GetBase())); @@ -28,7 +28,7 @@ public: template <class TEvent, class TExpectedActor> class IEventForActor: public IEventBase { protected: - virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override { + virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override { Y_VERIFY_DEBUG(dynamic_cast<TExpectedActor*>(actor)); auto* actorCorrect = static_cast<TExpectedActor*>(actor); TEventContext<TEvent> context(std::move(eventPtr)); @@ -41,7 +41,7 @@ public: template <class TBaseEvent, class TEvent, class TExpectedObject> class IEventForAnything: public TBaseEvent { protected: - virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override { + virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override { auto* objImpl = dynamic_cast<TExpectedObject*>(actor); if (!objImpl) { return false; diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index 1ba4852f538..b33861a9668 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -74,7 +74,7 @@ namespace NActors { if (recpNodeId != NodeId && recpNodeId != 0) { // if recipient is not local one - rewrite with forward instruction - Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable()); + //Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable()); Y_VERIFY(ev->Recipient == recipient, "Event rewrite from %s to %s would be lost via interconnect", ev->Recipient.ToString().c_str(), @@ -96,7 +96,7 @@ namespace NActors { } if (target != actorId) { // a race has occured, terminate newly created actor - Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0)); + Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0)); } } recipient = target; @@ -110,8 +110,9 @@ namespace NActors { return true; } } - - Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown)); + if (ev) { + Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown)); + } return false; } @@ -121,7 +122,7 @@ namespace NActors { bool TActorSystem::GenericSend<&IExecutorPool::SpecificSend>(TAutoPtr<IEventHandle> ev) const; bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie) const { - return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie)); + return this->Send(new IEventHandleFat(recipient, DefSelfID, ev, flags, cookie)); } bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev) const { @@ -139,6 +140,14 @@ namespace NActors { } } + bool TActorSystem::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const { + return this->Send(ev->PrepareSend(recipient, DefSelfID, flags, cookie)); + } + + bool TActorSystem::Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags, ui64 cookie) const { + return this->Send(ev->PrepareSend(recipient, sender, flags, cookie)); + } + void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { Schedule(deadline - Timestamp(), ev, cookie); } diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 7002a9f44e8..b506a1e3601 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -204,6 +204,8 @@ namespace NActors { bool SpecificSend(TAutoPtr<IEventHandle> ev) const; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const; + bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const; + bool Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const; /** * Schedule one-shot event that will be send at given time point in the future. diff --git a/library/cpp/actors/core/ask.cpp b/library/cpp/actors/core/ask.cpp index 821c7606d4d..e3fe6d9ee3f 100644 --- a/library/cpp/actors/core/ask.cpp +++ b/library/cpp/actors/core/ask.cpp @@ -47,9 +47,9 @@ namespace NActors { if (ev->GetTypeRewrite() == TTimeout::EventType) { Promise_.SetException(std::make_exception_ptr(yexception() << "ask timeout")); } else if (!ExpectedEventType_ || ev->GetTypeRewrite() == ExpectedEventType_) { - Promise_.SetValue(ev->ReleaseBase()); + Promise_.SetValue(IEventHandleFat::GetFat(ev.Get())->ReleaseBase()); } else { - Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << ev->GetBase()->ToString())); + Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << IEventHandleFat::GetFat(ev.Get())->GetBase()->ToString())); } PassAway(); diff --git a/library/cpp/actors/core/av_bootstrapped.cpp b/library/cpp/actors/core/av_bootstrapped.cpp index 771177242ec..e1123027926 100644 --- a/library/cpp/actors/core/av_bootstrapped.cpp +++ b/library/cpp/actors/core/av_bootstrapped.cpp @@ -7,7 +7,7 @@ public: }; TAutoPtr<NActors::IEventHandle> TActorAutoStart::AfterRegister(const TActorId& self, const TActorId& parentId) { - return new IEventHandle(self, parentId, new TEventForStart, 0); + return new IEventHandleFat(self, parentId, new TEventForStart, 0); } void TActorAutoStart::ProcessEvent(TEventContext<TEventForStart>& ev) { diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index 6a5230f825e..b963dfb4ea0 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -7,7 +7,79 @@ namespace NActors { Max<ui64>(), Max<ui64>() }; - TIntrusivePtr<TEventSerializedData> IEventHandle::ReleaseChainBuffer() { + TAutoPtr<IEventHandle>& IEventHandle::Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient) { + if (ev->IsEventLight()) { + IEventHandleLight::GetLight(ev.Get())->Forward(recipient); + } else { + ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient); + } + return ev; + } + + THolder<IEventHandle>& IEventHandle::Forward(THolder<IEventHandle>& ev, TActorId recipient) { + if (ev->IsEventLight()) { + IEventHandleLight::GetLight(ev.Get())->Forward(recipient); + } else { + ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient); + } + return ev; + } + + TString IEventHandle::GetTypeName() const { + if (IsEventFat()) { + auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this)); + return ev->HasEvent() ? TypeName(*(ev->GetBase())) : TypeName(*this); + } else { + return TypeName(*this); + } + } + + TString IEventHandle::ToString() const { + if (IsEventFat()) { + auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this)); + return ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?"; + } else { + // TODO(xenoxeno): + return TypeName(*this); + } + } + + bool IEventHandle::HasEvent() const { + if (IsEventLight()) { + return true; + } else { + return IEventHandleFat::GetFat(this)->HasEvent(); + } + } + + bool IEventHandle::HasBuffer() const { + if (IsEventLight()) { + return false; + } else { + return IEventHandleFat::GetFat(this)->HasBuffer(); + } + } + + TActorId IEventHandle::GetForwardOnNondeliveryRecipient() const { + if (IsEventLight()) { + return {}; + } else { + return IEventHandleFat::GetFat(this)->GetForwardOnNondeliveryRecipient(); + } + } + + size_t IEventHandle::GetSize() const { + if (IsEventLight()) { + if (IsEventSerializable()) { + return IEventHandleLightSerializable::GetLightSerializable(this)->GetSize(); + } + } else { + return IEventHandleFat::GetFat(this)->GetSize(); + } + return 0; + } + + TIntrusivePtr<TEventSerializedData> IEventHandleFat::ReleaseChainBuffer() { if (Buffer) { TIntrusivePtr<TEventSerializedData> result; DoSwap(result, Buffer); @@ -24,7 +96,7 @@ namespace NActors { return new TEventSerializedData; } - TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { + TIntrusivePtr<TEventSerializedData> IEventHandleFat::GetChainBuffer() { if (Buffer) { return Buffer; } @@ -36,4 +108,6 @@ namespace NActors { } return new TEventSerializedData; } + + std::vector<std::vector<IEventFactory*>*> TEventFactories::EventFactories; } diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 4eedeb0574a..28e2c95e000 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -9,6 +9,7 @@ #include <util/system/hp_timer.h> #include <util/generic/maybe.h> +#include <util/string/builder.h> namespace NActors { class TChunkSerializer; @@ -23,7 +24,7 @@ namespace NActors { public ISerializerToStream { protected: // for compatibility with virtual actors - virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandle> /*eventPtr*/) { + virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandleFat> /*eventPtr*/) { Y_VERIFY_DEBUG(false); return false; } @@ -33,7 +34,7 @@ namespace NActors { virtual ~IEventBase() { } - bool Execute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) { + bool Execute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) { return DoExecute(actor, std::move(eventPtr)); } @@ -53,8 +54,70 @@ namespace NActors { virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; } }; - // fat handle - class IEventHandle : TNonCopyable { + struct IEventHandleFields { + enum EFlags : ui32 { + FlagTrackDelivery = 1 << 0, + FlagForwardOnNondelivery = 1 << 1, + FlagSubscribeOnSession = 1 << 2, + FlagUseSubChannel = 1 << 3, + FlagGenerateUnsureUndelivered = 1 << 4, + FlagExtendedFormat = 1 << 5, + FlagLight = 1 << 18, + FlagSerializable = 1 << 19, + }; + + static constexpr ui32 STICKY_FLAGS = (FlagLight | FlagSerializable); + + TActorId Recipient = {}; + TActorId Sender = {}; + ui32 Type = {}; + +#pragma pack(push, 1) + union { + ui32 Flags = {}; + struct { + ui32 NormalFlags:6; + ui32 ReservedFlags:12; + ui32 StickyFlags:2; + ui32 Channel:12; + }; + struct { + // flags + ui32 TrackDelivery:1; + ui32 ForwardOnNondeliveryFlag:1; + ui32 SubscribeOnSession:1; + ui32 UseSubChannel:1; + ui32 GenerateUnsureUndelivered:1; + ui32 ExtendedFormat:1; + // reserved + ui32 Reserved:12; + // sticky flags + ui32 Light:1; + ui32 Serializable:1; + }; + }; +#pragma pack(pop) + + ui64 Cookie = {}; + + TActorId RewriteRecipient = {}; + ui32 RewriteType = {}; + + NWilson::TTraceId TraceId = {}; + +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + ::NHPTimer::STime SendTime = 0; +#endif + }; + + class IEventHandle : public IEventHandleFields { + public: + IEventHandle(IEventHandleFields&& fields = {}) + : IEventHandleFields(std::move(fields)) + {} + + virtual ~IEventHandle() = default; + struct TOnNondelivery { TActorId Recipient; @@ -64,28 +127,142 @@ namespace NActors { } }; + THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events + + ui16 GetChannel() const noexcept { + return Channel; + } + + ui64 GetSubChannel() const noexcept { + return UseSubChannel ? Sender.LocalId() : 0ULL; + } + + // deprecate(xenoxeno) ? + static const size_t ChannelBits = 12; + static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits; + + static ui32 MakeFlags(ui32 channel, ui32 flags) { + Y_VERIFY(channel < (1 << ChannelBits)); + Y_VERIFY(flags < (1 << ChannelShift)); + return (flags | (channel << ChannelShift)); + } + // + + void SetFlags(ui32 flags) { + Flags = (flags & ~STICKY_FLAGS) | (Flags & STICKY_FLAGS); + } + + const TActorId& GetRecipientRewrite() const { + return RewriteRecipient; + } + + void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) { + RewriteRecipient = recipientRewrite; + RewriteType = typeRewrite; + } + + void DropRewrite() { + RewriteRecipient = Recipient; + RewriteType = Type; + } + + ui32 GetTypeRewrite() const { + return RewriteType; + } + + bool IsEventLight() const { + return Light; + } + + bool IsEventFat() const { + return !Light; + } + + bool IsEventSerializable() const { + return Serializable; + } + + template<typename TEventType> + TEventType* Get(); + template<typename TEventType> + TEventType* CastAsLocal(); + template<typename TEventType> + TEventType* StaticCastAsLocal(); + bool HasEvent() const; + bool HasBuffer() const; + TString GetTypeName() const; + TString ToString() const; + size_t GetSize() const; + static TAutoPtr<IEventHandle>& Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient); + static THolder<IEventHandle>& Forward(THolder<IEventHandle>& ev, TActorId recipient); + static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure = false); + static TAutoPtr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure = false); + template<typename TEventType> + static TEventType* Release(TAutoPtr<IEventHandle>&); + template<typename TEventType> + static TEventType* Release(THolder<IEventHandle>&); + template<typename TEventType> + static TEventType* Release(std::unique_ptr<IEventHandle>&); + + template<typename TEventTypeSmartPtr> + static TAutoPtr<IEventHandle> ForwardOnNondelivery(TEventTypeSmartPtr& ev, ui32 reason, bool unsure = false) { + TAutoPtr<IEventHandle> evi(ev.Release()); + return ForwardOnNondelivery(evi, reason, unsure); + } + + TActorId GetForwardOnNondeliveryRecipient() const; + }; + + // fat handle + class IEventHandleFat : public IEventHandle, TNonCopyable { public: template <typename TEv> - inline TEv* CastAsLocal() const noexcept { + inline TEv* CastAsLocal() const noexcept { // cast with event check auto fits = GetTypeRewrite() == TEv::EventType; + constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());}; + if constexpr (couldBeCasted) { + return fits ? static_cast<TEv*>(Event.Get()) : nullptr; + } else { + Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data()); + } + } - return fits ? static_cast<TEv*>(Event.Get()) : nullptr; + template <typename TEv> + inline TEv* StaticCastAsLocal() const noexcept { // blind cast + constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());}; + if constexpr (couldBeCasted) { + return static_cast<TEv*>(Event.Get()); + } else { + Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data()); + } } template <typename TEventType> TEventType* Get() { if (Type != TEventType::EventType) - Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType); + Y_FAIL("%s", (TStringBuilder() + << "Event type " << Type + << " doesn't match the expected type " << TEventType::EventType + << " requested typename " << TypeName<TEventType>() + << " actual typename " << GetTypeName()).data()); - if (!Event) { + constexpr bool couldBeGot = requires() { Event.Reset(TEventType::Load(Buffer.Get())); + static_cast<TEventType*>(Event.Get()); + }; + if constexpr (couldBeGot) { + if (!Event) { + Event.Reset(TEventType::Load(Buffer.Get())); + } + + if (Event) { + return static_cast<TEventType*>(Event.Get()); + } + + Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data()); + } else { + Y_FAIL("Event type %" PRIu32 " couldn't be get as type %s", Type, TypeName<TEventType>().data()); } - - if (Event) { - return static_cast<TEventType*>(Event.Get()); - } - - Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data()); } template <typename T> @@ -96,104 +273,43 @@ namespace NActors { return x; } - enum EFlags { - FlagTrackDelivery = 1 << 0, - FlagForwardOnNondelivery = 1 << 1, - FlagSubscribeOnSession = 1 << 2, - FlagUseSubChannel = 1 << 3, - FlagGenerateUnsureUndelivered = 1 << 4, - FlagExtendedFormat = 1 << 5, - }; - - const ui32 Type; - const ui32 Flags; - const TActorId Recipient; - const TActorId Sender; - const ui64 Cookie; const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect - // if set, used by ActorSystem/Interconnect to report tracepoints - NWilson::TTraceId TraceId; - // filled if feeded by interconnect session const TActorId InterconnectSession; -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - ::NHPTimer::STime SendTime; -#endif - - static const size_t ChannelBits = 12; - static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits; #ifdef USE_ACTOR_CALLSTACK TCallstack Callstack; #endif - ui16 GetChannel() const noexcept { - return Flags >> ChannelShift; - } - - ui64 GetSubChannel() const noexcept { - return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL; - } - - static ui32 MakeFlags(ui32 channel, ui32 flags) { - Y_VERIFY(channel < (1 << ChannelBits)); - Y_VERIFY(flags < (1 << ChannelShift)); - return (flags | (channel << ChannelShift)); - } - private: THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; - TActorId RewriteRecipient; - ui32 RewriteType; - - THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events - public: - void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) { - RewriteRecipient = recipientRewrite; - RewriteType = typeRewrite; - } - - void DropRewrite() { - RewriteRecipient = Recipient; - RewriteType = Type; - } - - const TActorId& GetRecipientRewrite() const { - return RewriteRecipient; - } - - ui32 GetTypeRewrite() const { - return RewriteType; - } - TActorId GetForwardOnNondeliveryRecipient() const { return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId(); } - IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, + IEventHandleFat(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) - : Type(ev->Type()) - , Flags(flags) - , Recipient(recipient) - , Sender(sender) - , Cookie(cookie) - , TraceId(std::move(traceId)) -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - , SendTime(0) -#endif + : IEventHandle({ + .Recipient = recipient, + .Sender = sender, + .Type = ev->Type(), + .Flags = flags, + .Cookie = cookie, + .RewriteRecipient = recipient, + .RewriteType = ev->Type(), + .TraceId = std::move(traceId), + }) , Event(ev) - , RewriteRecipient(Recipient) - , RewriteType(Type) { if (forwardOnNondelivery) OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery)); } - IEventHandle(ui32 type, + IEventHandleFat(ui32 type, ui32 flags, const TActorId& recipient, const TActorId& sender, @@ -201,25 +317,24 @@ namespace NActors { ui64 cookie, const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) - : Type(type) - , Flags(flags) - , Recipient(recipient) - , Sender(sender) - , Cookie(cookie) - , TraceId(std::move(traceId)) -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - , SendTime(0) -#endif + : IEventHandle({ + .Recipient = recipient, + .Sender = sender, + .Type = type, + .Flags = flags, + .Cookie = cookie, + .RewriteRecipient = recipient, + .RewriteType = type, + .TraceId = std::move(traceId), + }) , Buffer(std::move(buffer)) - , RewriteRecipient(Recipient) - , RewriteType(Type) { if (forwardOnNondelivery) OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery)); } // Special ctor for events from interconnect. - IEventHandle(const TActorId& session, + IEventHandleFat(const TActorId& session, ui32 type, ui32 flags, const TActorId& recipient, @@ -228,20 +343,19 @@ namespace NActors { ui64 cookie, TScopeId originScopeId, NWilson::TTraceId traceId) noexcept - : Type(type) - , Flags(flags) - , Recipient(recipient) - , Sender(sender) - , Cookie(cookie) + : IEventHandle({ + .Recipient = recipient, + .Sender = sender, + .Type = type, + .Flags = flags, + .Cookie = cookie, + .RewriteRecipient = recipient, + .RewriteType = type, + .TraceId = std::move(traceId), + }) , OriginScopeId(originScopeId) - , TraceId(std::move(traceId)) , InterconnectSession(session) -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - , SendTime(0) -#endif , Buffer(std::move(buffer)) - , RewriteRecipient(Recipient) - , RewriteType(Type) { } @@ -286,28 +400,267 @@ namespace NActors { TAutoPtr<IEventHandle> Forward(const TActorId& dest) { if (Event) - return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId)); + return new IEventHandleFat(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId)); else - return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId)); + return new IEventHandleFat(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId)); + } + + static TAutoPtr<IEventHandleFat> MakeFat(TAutoPtr<IEventHandle> ev) { + if (ev->IsEventLight()) { + Y_FAIL("Can't make light event fat"); + } else { + TAutoPtr<IEventHandleFat> evb(static_cast<IEventHandleFat*>(ev.Release())); + return evb; + } } - TAutoPtr<IEventHandle> ForwardOnNondelivery(ui32 reason, bool unsure = false); + static IEventHandleFat* GetFat(IEventHandle* ev) { + if (ev->IsEventLight()) { + Y_FAIL("Can't make light event fat"); + } else { + return static_cast<IEventHandleFat*>(ev); + } + } + + static const IEventHandleFat* GetFat(const IEventHandle* ev) { + if (ev->IsEventLight()) { + Y_FAIL("Can't make light event fat"); + } else { + return static_cast<const IEventHandleFat*>(ev); + } + } + + static IEventHandleFat* GetFat(TAutoPtr<IEventHandle>& ev) { + return GetFat(ev.Get()); + } + + static IEventHandleFat* GetFat(THolder<IEventHandle>& ev) { + return GetFat(ev.Get()); + } + + static IEventHandleFat* GetFat(std::unique_ptr<IEventHandle>& ev) { + return GetFat(ev.get()); + } + + static TAutoPtr<IEventHandle> MakeBase(TAutoPtr<IEventHandleFat> ev) { + return ev.Release(); + } + + static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure = false); }; template <typename TEventType> - class TEventHandle: public IEventHandle { - TEventHandle(); // we never made instance of TEventHandle + class TEventHandleFat: public IEventHandleFat { + TEventHandleFat(); // we never made instance of TEventHandleFat public: TEventType* Get() { - return IEventHandle::Get<TEventType>(); + return IEventHandleFat::Get<TEventType>(); } TAutoPtr<TEventType> Release() { - return IEventHandle::Release<TEventType>(); + return IEventHandleFat::Release<TEventType>(); + } + }; + + static_assert(sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat), "expect sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat)"); + + // light handle + class IEventHandleLight : public IEventHandle { + public: + IEventHandleLight(ui32 type) { + RewriteType = Type = type; + Light = true; + } + + IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender) { + RewriteRecipient = Recipient = recipient; + Sender = sender; + return this; + } + + IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags) { + RewriteRecipient = Recipient = recipient; + Sender = sender; + SetFlags(flags); + return this; + } + + IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie) { + RewriteRecipient = Recipient = recipient; + Sender = sender; + SetFlags(flags); + Cookie = cookie; + return this; + } + + IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) { + RewriteRecipient = Recipient = recipient; + Sender = sender; + SetFlags(flags); + Cookie = cookie; + TraceId = std::move(traceId); + return this; + } + + void Forward(const TActorId& dest) { + RewriteRecipient = dest; + } + + static IEventHandleLight* GetLight(IEventHandle* ev) { + if (ev->IsEventLight()) { + return static_cast<IEventHandleLight*>(ev); + } else { + Y_FAIL("Can't make fat event light"); + } + } + + static IEventHandleLight* GetLight(TAutoPtr<IEventHandle>& ev) { + return GetLight(ev.Get()); + } + + static IEventHandleLight* GetLight(THolder<IEventHandle>& ev) { + return GetLight(ev.Get()); + } + + static IEventHandleLight* GetLight(std::unique_ptr<IEventHandle>& ev) { + return GetLight(ev.get()); + } + + static IEventHandleLight* ReleaseLight(TAutoPtr<IEventHandle>& ev) { + return GetLight(ev.Release()); + } + + static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure = false); + + template<typename TEventType> + TEventType* Get() { + if (Type != TEventType::EventType) { + Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType); + } + constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);}; + if constexpr (couldBeConverted) { + return static_cast<TEventType*>(this); + } else { + Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data()); + } + } + + template<typename TEventType> + TEventType* CastAsLocal() { + constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);}; + if constexpr (couldBeConverted) { + if (Type == TEventType::EventType) { + return static_cast<TEventType*>(this); + } + } + return nullptr; + } + + template<typename TEventType> + TEventType* StaticCastAsLocal() { + constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);}; + if constexpr (couldBeConverted) { + return static_cast<TEventType*>(this); + } + Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data()); } }; - static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)"); + template<typename TEventType> + TEventType* IEventHandle::Get() { + if (IsEventLight()) { + return IEventHandleLight::GetLight(this)->Get<TEventType>(); + } else { + return IEventHandleFat::GetFat(this)->Get<TEventType>(); + } + } + + template<typename TEventType> + TEventType* IEventHandle::CastAsLocal() { + if (IsEventLight()) { + return IEventHandleLight::GetLight(this)->CastAsLocal<TEventType>(); + } else { + return IEventHandleFat::GetFat(this)->CastAsLocal<TEventType>(); + } + } + + template<typename TEventType> + TEventType* IEventHandle::StaticCastAsLocal() { + if (IsEventLight()) { + return IEventHandleLight::GetLight(this)->StaticCastAsLocal<TEventType>(); + } else { + return IEventHandleFat::GetFat(this)->StaticCastAsLocal<TEventType>(); + } + } + + template<typename TEventType> + TEventType* IEventHandle::Release(TAutoPtr<IEventHandle>& ev) { + if (ev->IsEventLight()) { + return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>(); + } else { + return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release(); + } + } + + template<typename TEventType> + TEventType* IEventHandle::Release(THolder<IEventHandle>& ev) { + if (ev->IsEventLight()) { + return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>(); + } else { + return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release(); + } + } + + template<typename TEventType> + TEventType* IEventHandle::Release(std::unique_ptr<IEventHandle>& ev) { + if (ev->IsEventLight()) { + return IEventHandleLight::GetLight(ev.release())->CastAsLocal<TEventType>(); + } else { + return IEventHandleFat::GetFat(ev.get())->Release<TEventType>().Release(); + } + } + + + class IEventHandleLightSerializable; + + using TEventSerializer = std::function<bool(const IEventHandleLightSerializable*, TChunkSerializer*)>; + + class IEventHandleLightSerializable : public IEventHandleLight { + public: + TEventSerializer Serializer; + + IEventHandleLightSerializable(ui32 type, TEventSerializer serializer) + : IEventHandleLight(type) + , Serializer(std::move(serializer)) + { + Serializable = true; + } + + static IEventHandleLightSerializable* GetLightSerializable(IEventHandle* ev) { + if (ev->IsEventSerializable()) { + return static_cast<IEventHandleLightSerializable*>(ev); + } else { + Y_FAIL("Can't make serializable event"); + } + } + + static const IEventHandleLightSerializable* GetLightSerializable(const IEventHandle* ev) { + if (ev->IsEventSerializable()) { + return static_cast<const IEventHandleLightSerializable*>(ev); + } else { + Y_FAIL("Can't make serializable event"); + } + } + + static IEventHandleLightSerializable* GetLightSerializable(TAutoPtr<IEventHandle>& ev) { + return GetLightSerializable(ev.Get()); + } + + size_t GetSize() const { + // TODO(xenoxeno) + return 0; + } + }; template <typename TEventType, ui32 EventType0> class TEventBase: public IEventBase { @@ -318,7 +671,7 @@ namespace NActors { } // still abstract - typedef TEventHandle<TEventType> THandle; + typedef TEventHandleFat<TEventType> THandle; typedef TAutoPtr<THandle> TPtr; }; @@ -327,7 +680,7 @@ namespace NActors { return TString(header); \ } \ bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \ - Y_FAIL("Local event " #eventType " is not serializable"); \ + Y_FAIL("Local event %s is not serializable", TypeName(*this).data()); \ } \ static IEventBase* Load(NActors::TEventSerializedData*) { \ Y_FAIL("Local event " #eventType " has no load method"); \ @@ -349,4 +702,39 @@ namespace NActors { bool IsSerializable() const override { \ return true; \ } + + + struct TEventMeta { + TActorId Session; + ui32 Type; + ui32 Flags; + TActorId Recipient; + TActorId Sender; + ui64 Cookie; + TScopeId OriginScopeId; + NWilson::TTraceId TraceId; + TRope Data; + + bool IsExtendedFormat() const { + return Flags & IEventHandle::FlagExtendedFormat; + } + }; + + inline constexpr ui32 GetEventSpace(ui32 eventType) { + return (eventType >> 16u); + } + + inline constexpr ui32 GetEventSubType(ui32 eventType) { + return (eventType & (0xffff)); + } + + struct IEventFactory { + virtual IEventHandle* Construct(const TEventMeta& eventMeta) = 0; + virtual void Destruct(IEventHandle*) = 0; + }; + + struct TEventFactories { + // it supposed to be statically allocated in the begining + static std::vector<std::vector<IEventFactory*>*> EventFactories; // [EventSpace][EventSubType] + }; } diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index 30cc26aa463..b845760221e 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -7,7 +7,7 @@ #include <library/cpp/actors/wilson/wilson_trace.h> namespace NActors { - class IEventHandle; + class IEventHandleFat; struct TConstIoVec { const void* Data; diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h index 2845aa94dd9..5288a65a894 100644 --- a/library/cpp/actors/core/event_local.h +++ b/library/cpp/actors/core/event_local.h @@ -71,4 +71,20 @@ namespace NActors { return new TEv(); } }; + + template<typename TEv, ui32 EventType0> + class TEventLight : public IEventHandleLight { + public: + static constexpr ui32 EventType = EventType0; + + TEventLight() + : IEventHandleLight(EventType) + {} + + TEv* Get() { + return static_cast<TEv*>(this); + } + + using TPtr = TAutoPtr<TEv>; + }; } diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 594559cea78..ed696dc0f90 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -9,6 +9,8 @@ #include <util/generic/deque.h> #include <util/system/context.h> #include <util/system/filemap.h> +#include <util/string/builder.h> +#include <util/thread/lfstack.h> #include <array> namespace NActors { @@ -511,4 +513,290 @@ namespace NActors { dest->SetRawX1(src.RawX1()); dest->SetRawX2(src.RawX2()); } + + + + + + + + + + + + template<ui32 EventSpace> + struct TLightEventSpaceFactories { + public: + static std::vector<IEventFactory*>& GetEventSpaceFactories(ui32 eventSpace) { + if (TEventFactories::EventFactories.size() <= eventSpace) { + // it's only safe to do BEFORE initialization of ActorSystem + TEventFactories::EventFactories.resize(eventSpace + 1); + } + if (TEventFactories::EventFactories[eventSpace] == nullptr) { + TEventFactories::EventFactories[eventSpace] = new std::vector<IEventFactory*>(); + } + return *TEventFactories::EventFactories[eventSpace]; + } + + static void SetEventFactory(ui32 eventType, IEventFactory* factory) { + Y_VERIFY(GetEventSpace(eventType) == EventSpace); + auto eventSpaceFactories = GetEventSpaceFactories(GetEventSpace(eventType)); + if (eventSpaceFactories.size() <= GetEventSubType(eventType)) { + // it's only safe to do BEFORE initialization of ActorSystem + eventSpaceFactories.resize(GetEventSubType(eventType) + 1); + } + eventSpaceFactories[GetEventSubType(eventType)] = factory; + } + }; + + template<typename TEvent> + class TLightEventFactory : public IEventFactory, TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)> { + private: + mutable size_t CachedByteSize = 0; + + static constexpr char PayloadMarker = 0x07; + static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7; + + static size_t SerializeNumber(size_t num, char *buffer) { + char *begin = buffer; + do { + *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00); + num >>= 7; + } while (num); + return buffer - begin; + } + + static size_t DeserializeNumber(const char **ptr, const char *end) { + const char *p = *ptr; + size_t res = 0; + size_t offset = 0; + for (;;) { + if (p == end) { + return Max<size_t>(); + } + const char byte = *p++; + res |= (static_cast<size_t>(byte) & 0x7F) << offset; + offset += 7; + if (!(byte & 0x80)) { + break; + } + } + *ptr = p; + return res; + } + + static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) { + size_t res = 0; + size_t offset = 0; + for (;;) { + if (!iter.Valid()) { + return Max<size_t>(); + } + const char byte = *iter.ContiguousData(); + iter += 1; + --size; + res |= (static_cast<size_t>(byte) & 0x7F) << offset; + offset += 7; + if (!(byte & 0x80)) { + break; + } + } + return res; + } + + public: + void DeserializeProtoEvent(TEvent* event, const TEventMeta& eventMeta) { + TRope::TConstIterator iter = eventMeta.Data.Begin(); + ui64 size = eventMeta.Data.GetSize(); + if (eventMeta.IsExtendedFormat()) { + constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;}; + if constexpr (hasPayload) { + // check marker + if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { + Y_FAIL("Invalid event marker"); + } + // skip marker + iter += 1; + --size; + // parse number of payload ropes + size_t numRopes = DeserializeNumber(iter, size); + if (numRopes == Max<size_t>()) { + Y_FAIL("Invalid event rope number"); + } + while (numRopes--) { + // parse length of the rope + const size_t len = DeserializeNumber(iter, size); + if (len == Max<size_t>() || size < len) { + Y_FAIL("%s", (TStringBuilder() << "Invalid event len# " << len << " size# " << size).data()); + } + // extract the rope + TRope::TConstIterator begin = iter; + iter += len; + size -= len; + event->Payload.emplace_back(begin, iter); + } + } else { + Y_FAIL("%s", (TStringBuilder() << "Extended format is not supported for event " << TypeName<TEvent>()).data()); + } + } + // parse the protobuf + TRopeStream stream(iter, size); + if (!event->Record.ParsePartialFromZeroCopyStream(&stream)) { + Y_FAIL("%s", (TStringBuilder() << "Failed to parse protobuf event type " << eventMeta.Type << " class " << TypeName<TEvent>()).data()); + } + } + + static bool SerializeProtoEvent(const TEvent* event, TChunkSerializer* chunker) { + constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;}; + if constexpr (hasPayload) { + // serialize payload first + if (event->Payload) { + void *data; + int size = 0; + auto append = [&](const char *p, size_t len) { + while (len) { + if (size) { + const size_t numBytesToCopy = std::min<size_t>(size, len); + memcpy(data, p, numBytesToCopy); + data = static_cast<char*>(data) + numBytesToCopy; + size -= numBytesToCopy; + p += numBytesToCopy; + len -= numBytesToCopy; + } else if (!chunker->Next(&data, &size)) { + return false; + } + } + return true; + }; + auto appendNumber = [&](size_t number) { + char buf[MaxNumberBytes]; + return append(buf, SerializeNumber(number, buf)); + }; + char marker = PayloadMarker; + append(&marker, 1); + if (!appendNumber(event->Payload.size())) { + return false; + } + for (const TRope& rope : event->Payload) { + if (!appendNumber(rope.GetSize())) { + return false; + } + if (rope) { + if (size) { + chunker->BackUp(std::exchange(size, 0)); + } + if (!chunker->WriteRope(&rope)) { + return false; + } + } + } + if (size) { + chunker->BackUp(size); + } + } + } + return event->Record.SerializeToZeroCopyStream(chunker); + } + + public: + // TLockFreeStack<TEvent*> Stack; + // std::atomic<i32> StackSize; + // //std::deque<TEvent*> Stack; + + // static constexpr i32 MAX_STACK_SIZE = 10; + + TEvent* New() { + // TEvent* result; + // if (Stack.Dequeue(&result)) { + // --StackSize; + // return result; + // } + // /*if (!Stack.empty()) { + // TEvent* result = Stack.back(); + // Stack.pop_back(); + // return result; + // }*/ + return new TEvent(); + } + + void Delete(TEvent* event) { + // if (StackSize < MAX_STACK_SIZE) { + // Stack.Enqueue(event); + // ++StackSize; + // } else { + // delete event; + // } + // /*if (Stack.size() < MAX_STACK_SIZE) { + // event->Record.Clear(); + // Stack.push_back(event); + // return; + // }*/ + delete event; + } + + virtual IEventHandle* Construct(const TEventMeta& eventMeta) override { + IEventHandle* ev = nullptr; + if (eventMeta.Type == TEvent::EventType) { + TEvent* event = New(); + DeserializeProtoEvent(event, eventMeta); + } + return ev; + } + + virtual void Destruct(IEventHandle* ev) override { + if (ev->Type != TEvent::EventType) { + Y_FAIL("Wrong event supplied"); + } + Delete(static_cast<TEvent*>(ev)); + } + + TLightEventFactory() { + Cerr << "TLightEventFactory<" << TypeName<TEvent>() << ">()" << Endl; + TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)>::SetEventFactory(TEvent::EventType, this); + } + }; + + template<typename TEv> + struct TLightEventFactoryInitializer { + static TLightEventFactory<TEv> EventFactory; + + static TEv* New() { + return EventFactory.New(); + } + + static void Delete(TEv* event) { + EventFactory.Delete(event); + } + }; + + template<typename TEv> + TLightEventFactory<TEv> TLightEventFactoryInitializer<TEv>::EventFactory; + + template<typename TEv, typename TRecord, ui32 EventType0> + class TEventLightPB : public IEventHandleLightSerializable, public TLightEventFactoryInitializer<TEv> { + public: + static constexpr ui32 EventType = EventType0; + + TRecord Record; + + static bool SerializeProto(const IEventHandleLightSerializable* event, TChunkSerializer* chunker) { + return TLightEventFactory<TEv>::SerializeProtoEvent(static_cast<const TEv*>(event), chunker); + } + + TEventLightPB() + : IEventHandleLightSerializable(EventType, &TEventLightPB<TEv, TRecord, EventType0>::SerializeProto) + {} + + TEv* Get() { + return static_cast<TEv*>(this); + } + + using TPtr = TAutoPtr<TEv>; + }; + + template<typename TEv, typename TRecord, ui32 EventType0> + class TEventLightPBWithPayload : public TEventLightPB<TEv, TRecord, EventType0> { + public: + std::vector<TRope> Payload; + }; } diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp index dfd79bf96e9..ea5fd101243 100644 --- a/library/cpp/actors/core/events_undelivered.cpp +++ b/library/cpp/actors/core/events_undelivered.cpp @@ -38,23 +38,70 @@ namespace NActors { return new TEvUndelivered(sourceType, reason); } - TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(ui32 reason, bool unsure) { - if (Flags & FlagForwardOnNondelivery) { - const ui32 updatedFlags = Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession); - const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId(); + TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure) { + std::unique_ptr<IEventHandle> tev(ev.Release()); + TAutoPtr<IEventHandle> fw = ForwardOnNondelivery(tev, reason, unsure); + if (tev) { + // we don't want to delete original event handle here + ev = tev.release(); + } + return fw; + } + + TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure) { + if (ev->IsEventFat()) { + std::unique_ptr<IEventHandleFat> evf(IEventHandleFat::GetFat(ev.release())); + std::unique_ptr<IEventHandle> fw = IEventHandleFat::ForwardOnNondelivery(evf, reason, unsure); + if (evf) { + ev = std::unique_ptr<IEventHandle>(evf.release()); + } + return fw.release(); + } + if (ev->IsEventLight()) { + std::unique_ptr<IEventHandleLight> evl(IEventHandleLight::GetLight(ev.release())); + std::unique_ptr<IEventHandle> fw = IEventHandleLight::ForwardOnNondelivery(evl, reason, unsure); + if (evl) { + ev = std::unique_ptr<IEventHandle>(evl.release()); + } + return fw.release(); + } + return {}; + } - if (Event) - return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, std::move(TraceId)); + std::unique_ptr<IEventHandle> IEventHandleFat::ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure) { + if (ev->ForwardOnNondeliveryFlag) { + const ui32 updatedFlags = ev->Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession); + const TActorId recp = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId(); + + if (ev->Event) + return std::unique_ptr<IEventHandle>(new IEventHandleFat(recp, ev->Sender, ev->Event.Release(), updatedFlags, ev->Cookie, &ev->Recipient, std::move(ev->TraceId))); else - return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, std::move(TraceId)); + return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Type, updatedFlags, recp, ev->Sender, ev->Buffer, ev->Cookie, &ev->Recipient, std::move(ev->TraceId))); } - if (Flags & FlagTrackDelivery) { - const ui32 updatedFlags = Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered); - return new IEventHandle(Sender, Recipient, new TEvents::TEvUndelivered(Type, reason, unsure), updatedFlags, - Cookie, nullptr, std::move(TraceId)); + if (ev->TrackDelivery) { + const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered); + return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags, + ev->Cookie, nullptr, std::move(ev->TraceId))); } + return {}; + } - return nullptr; + std::unique_ptr<IEventHandle> IEventHandleLight::ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure) { + if (ev->ForwardOnNondeliveryFlag) { + ev->ForwardOnNondeliveryFlag = false; + ev->SubscribeOnSession = false; + auto recpt = ev->Recipient; + ev->Recipient = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId(); + ev->OnNondeliveryHolder = MakeHolder<TOnNondelivery>(recpt); + return std::unique_ptr<IEventHandle>(ev.release()); + } + + if (ev->TrackDelivery) { + const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered); + return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags, + ev->Cookie, nullptr, std::move(ev->TraceId))); + } + return {}; } } diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 3c5dc2c2b4d..50ae097a15c 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -96,13 +96,12 @@ namespace NActors { inline void LwTraceSlowDelivery(IEventHandle* ev, const std::type_info* actorType, ui32 poolId, const TActorId& currentRecipient, double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) { - const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr; LWPROBE(EventSlowDelivery, poolId, delivMs, sinceActivationMs, eventsExecutedBefore, - baseEv ? SafeTypeName(baseEv) : (ev ? ToString(ev->Type) : TString("nullptr")), + ev && ev->HasEvent() ? ev->GetTypeName() : (ev ? ToString(ev->Type) : TString("nullptr")), currentRecipient.ToString(), SafeTypeName(actorType)); } @@ -110,11 +109,10 @@ namespace NActors { inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const std::type_info* actorType, ui32 poolId, const TActorId& currentRecipient, double eventMs) { // Event could have been destroyed by actor->Receive(); - const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr; LWPROBE(SlowEvent, poolId, eventMs, - baseEv ? SafeTypeName(baseEv) : ToString(evTypeForTracing), + ev && ev->HasEvent() ? ev->GetTypeName() : ToString(evTypeForTracing), currentRecipient.ToString(), SafeTypeName(actorType)); } @@ -178,7 +176,7 @@ namespace NActors { NProfiling::TMemoryTagScope::Reset(ActorSystem->MemProfActivityBase + activityType); } - actor->Receive(ev, ctx); + actor->Receive(ev); size_t dyingActorsCnt = DyingActors.size(); Ctx.UpdateActorsStats(dyingActorsCnt); @@ -204,7 +202,7 @@ namespace NActors { } else { actorType = nullptr; - TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); + TAutoPtr<IEventHandle> nonDelivered = IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown); if (nonDelivered.Get()) { ActorSystem->Send(nonDelivered); } else { diff --git a/library/cpp/actors/core/io_dispatcher.cpp b/library/cpp/actors/core/io_dispatcher.cpp index 6bd753f2e04..cccb2336ffc 100644 --- a/library/cpp/actors/core/io_dispatcher.cpp +++ b/library/cpp/actors/core/io_dispatcher.cpp @@ -116,7 +116,7 @@ namespace NActors { bool sendNotify; if (!Actor.TaskQueue.Dequeue(tasks, &sendNotify)) { if (sendNotify) { - ActorSystem->Send(new IEventHandle(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(), + ActorSystem->Send(new IEventHandleFat(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(), nullptr, TThread::CurrentThreadId())); } break; diff --git a/library/cpp/actors/core/io_dispatcher.h b/library/cpp/actors/core/io_dispatcher.h index b0e4e60d1a7..a33cb0d98e6 100644 --- a/library/cpp/actors/core/io_dispatcher.h +++ b/library/cpp/actors/core/io_dispatcher.h @@ -28,7 +28,7 @@ namespace NActors { */ template<typename TCallback> static void InvokeIoCallback(TCallback&& callback, ui32 poolId, IActor::EActivityType activityType) { - if (!TActivationContext::Send(new IEventHandle(MakeIoDispatcherActorId(), TActorId(), + if (!TActivationContext::Send(new IEventHandleFat(MakeIoDispatcherActorId(), TActorId(), new TEvInvokeQuery(callback)))) { TActivationContext::Register(CreateExecuteLaterActor(std::move(callback), activityType), TActorId(), TMailboxType::HTSwap, poolId); diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index 5000216f50c..c7bed09f1ba 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -116,7 +116,7 @@ const auto& currentTracer = component; \ if (ev->HasEvent()) { \ LOG_TRACE(*TlsActivationContext, currentTracer, "%s, received event# %" PRIu32 ", Sender %s, Recipient %s: %s", \ - __FUNCTION__, ev->Type, ev->Sender.ToString().data(), SelfId().ToString().data(), ev->GetBase()->ToString().substr(0, 1000).data()); \ + __FUNCTION__, ev->Type, ev->Sender.ToString().data(), SelfId().ToString().data(), ev->ToString().substr(0, 1000).data()); \ } else { \ LOG_TRACE(*TlsActivationContext, currentTracer, "%s, received event# %" PRIu32 ", Sender %s, Recipient %s", \ __FUNCTION__, ev->Type, ev->Sender.ToString().data(), ev->Recipient.ToString().data()); \ @@ -205,7 +205,7 @@ namespace NActors { std::shared_ptr<NMonitoring::TMetricRegistry> metrics); ~TLoggerActor(); - void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { + void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { HFunc(TFlushLogBuffer, FlushLogBufferMessageEvent); HFunc(NLog::TEvLog, HandleLogEvent); @@ -316,7 +316,7 @@ namespace NActors { { const NLog::TSettings *mSettings = ctx.LoggerSettings(); TLoggerActor::Throttle(*mSettings); - ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); + ctx.Send(new IEventHandleFat(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); } template <typename TCtx, typename... TArgs> diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp index 995e3c4121c..572af788ec9 100644 --- a/library/cpp/actors/core/log_ut.cpp +++ b/library/cpp/actors/core/log_ut.cpp @@ -105,19 +105,19 @@ namespace { {} void WriteLog() { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); + Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); } void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg, TString msg = "foo") { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)}); + Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)}); } void FlushLogBuffer() { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TFlushLogBuffer()}); + Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TFlushLogBuffer()}); } void Wakeup() { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup}); + Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvents::TEvWakeup}); } TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()}; diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp index febc5e40dd2..fef051da619 100644 --- a/library/cpp/actors/core/scheduler_actor.cpp +++ b/library/cpp/actors/core/scheduler_actor.cpp @@ -234,7 +234,7 @@ namespace NActors { sentCount = Min(eventsToBeSentSize, Cfg.RelaxedSendPaceEventsPerCycle); } for (ui32 i = 0; i < sentCount; ++i) { - ctx.Send(EventsToBeSent.front()); + ctx.Send(EventsToBeSent.front().Release()); EventsToBeSent.pop_front(); } |
