diff options
author | xenoxeno <xeno@ydb.tech> | 2023-03-28 14:31:05 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-03-28 14:31:05 +0300 |
commit | 33421d638103cc382ba851d2491740e2db576307 (patch) | |
tree | 166e19b62c40deb088b62651e2a0cb86d4ed8f5c /library/cpp | |
parent | 8cf3b1d08aa8791cd5cb7ee2a11fbb712cd72d16 (diff) | |
download | ydb-33421d638103cc382ba851d2491740e2db576307.tar.gz |
revert light events
Diffstat (limited to 'library/cpp')
58 files changed, 364 insertions, 1375 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index 304f7405ae..00eef387ea 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -16,22 +16,6 @@ namespace NActors { return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId)); } - bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept { - return SelfActorId.Send(recipient, ev); - } - - bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept { - return SelfActorId.Send(recipient, ev, flags); - } - - bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept { - return SelfActorId.Send(recipient, ev, flags, cookie); - } - - bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept { - return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId)); - } - void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); } @@ -45,15 +29,15 @@ namespace NActors { } void TActorIdentity::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie); + return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie); } void TActorIdentity::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie); + return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie); } void TActorIdentity::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const { - return TActivationContext::Schedule(delta, new IEventHandleFat(*this, {}, ev), cookie); + return TActivationContext::Schedule(delta, new IEventHandle(*this, {}, ev), cookie); } TActorId TActivationContext::RegisterWithSameMailbox(IActor* actor, TActorId parentId) { @@ -91,27 +75,27 @@ namespace NActors { } void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie); + ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); } void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie); + ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); } void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(delta, new IEventHandleFat(SelfID, TActorId(), ev), cookie); + ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie); } void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie); + TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie); + TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie); + TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } TInstant TActivationContext::Now() { @@ -161,7 +145,7 @@ namespace NActors { (actor->*StateFunc)(ev, TActivationContext::AsActorContext()); } - void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev) { + void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandle> ev) { Y_VERIFY(!!ev && ev->GetBase()); ev->GetBase()->Execute(actor, std::move(ev)); } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 5e04c0f2ab..e1232c01a8 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -139,16 +139,10 @@ namespace NActors { template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; template <ESendingType SendingType = ESendingType::Common> - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; - template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId)); } template <ESendingType SendingType = ESendingType::Common> - bool Send(const TActorId& recipient, THolder<IEventHandleLight> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { - return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId)); - } - template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId)); } @@ -228,14 +222,6 @@ namespace NActors { template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; - template <ESendingType SendingType = ESendingType::Common> - bool Send(const TActorId& recipient, IEventHandleLight* ev) const; - template <ESendingType SendingType = ESendingType::Common> - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const; - template <ESendingType SendingType = ESendingType::Common> - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const; - template <ESendingType SendingType = ESendingType::Common> - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const; bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; @@ -284,7 +270,7 @@ namespace NActors { class TActorVirtualBehaviour { public: - static void Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev); + static void Receive(IActor* actor, std::unique_ptr<IEventHandle> ev); public: }; @@ -333,8 +319,6 @@ namespace NActors { friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&); friend class TDecorator; protected: - using TReceiveFuncLight = void (IActor::*)(TAutoPtr<IEventHandle>& ev); - TReceiveFuncLight StateFuncLight = nullptr; TActorCallbackBehaviour CImpl; public: using TReceiveFunc = TActorCallbackBehaviour::TReceiveFunc; @@ -512,17 +496,13 @@ namespace NActors { TActorIdentity SelfId() const { return SelfActorId; } - // void Receive(TAutoPtr<IEventHandle> ev, const TActorContext& /*ctx*/) { - // Receive(ev); - // } - void Receive(TAutoPtr<IEventHandle>& ev) { + + void Receive(TAutoPtr<IEventHandle>& ev, const TActorContext& /*ctx*/) { ++HandledEvents; - if (StateFuncLight) { - (this->*StateFuncLight)(ev); - } else if (CImpl.Initialized()) { + if (CImpl.Initialized()) { CImpl.Receive(this, ev); } else { - TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandleFat>(IEventHandleFat::MakeFat(ev).Release())); + TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandle>(ev.Release())); } } @@ -535,10 +515,6 @@ namespace NActors { void Describe(IOutputStream&) const noexcept override; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; - bool Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept; - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept; - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept; - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept; bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{ return Send(recipient, ev.Release(), flags, cookie, std::move(traceId)); } @@ -639,78 +615,8 @@ namespace NActors { } }; - template<typename TDerived> - class TActorCallback : public IActorCallback { - public: - using TFatReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&, const TActorContext&); - using TLightReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&); - - private: - void BecomeFat(TFatReceiveFunc stateFunc) { - IActorCallback::Become(stateFunc); - } - - template<typename... TArgs> - void BecomeFat(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) { - IActorCallback::Become(stateFunc, ctx, std::forward<TArgs>(args)...); - } - - template<typename... TArgs> - void BecomeFat(TFatReceiveFunc stateFunc, TArgs&&... args) { - IActorCallback::Become(stateFunc, std::forward<TArgs>(args)...); - } - - void BecomeLight(TLightReceiveFunc stateFuncLight) { - StateFuncLight = static_cast<TReceiveFuncLight>(stateFuncLight); - } - - public: - TActorCallback(TFatReceiveFunc stateFunc, ui32 activityType = OTHER) - : IActorCallback(static_cast<TReceiveFunc>(stateFunc), activityType) - { - } - - TActorCallback(TLightReceiveFunc stateFunc, ui32 activityType = OTHER) - : IActorCallback(nullptr, activityType) - { - Become(stateFunc); - } - - template<typename T> - void Become(T stateFunc) { - IActorCallback::Become(stateFunc); - } - - template<typename T, typename... TArgs> - void Become(T stateFunc, const TActorContext&, TArgs&&... args) { - IActorCallback::Become(stateFunc); - Schedule(std::forward<TArgs>(args)...); - } - - template<typename T, typename... TArgs> - void Become(T stateFunc, TArgs&&... args) { - IActorCallback::Become(stateFunc); - Schedule(std::forward<TArgs>(args)...); - } - - template<> - void Become<TLightReceiveFunc>(TLightReceiveFunc stateFunc) { - BecomeLight(stateFunc); - } - - template<> - void Become<TFatReceiveFunc>(TFatReceiveFunc stateFunc) { - BecomeFat(stateFunc); - } - - template<typename... TArgs> - void Become(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) { - BecomeFat(stateFunc, ctx, std::forward<TArgs>(args)...); - } - }; - template <typename TDerived> - class TActor: public TActorCallback<TDerived> { + class TActor: public IActorCallback { private: template <typename T, typename = const char*> struct HasActorName: std::false_type { }; @@ -741,13 +647,8 @@ namespace NActors { // static constexpr char ActorName[] = "UNNAMED"; - TActor(typename TActorCallback<TDerived>::TFatReceiveFunc func, ui32 activityType = GetActivityTypeIndex()) - : TActorCallback<TDerived>(func, activityType) - { - } - - TActor(typename TActorCallback<TDerived>::TLightReceiveFunc func, ui32 activityType = GetActivityTypeIndex()) - : TActorCallback<TDerived>(func, activityType) + TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext&), ui32 activityType = GetActivityTypeIndex()) + : IActorCallback(static_cast<TReceiveFunc>(func), activityType) { } @@ -758,10 +659,8 @@ namespace NActors { #define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx #define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev -#define LIGHTFN_SIG TAutoPtr<::NActors::IEventHandle>& ev #define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx) #define STATEFN(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ) -#define LIGHTFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev) #define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype); @@ -773,26 +672,13 @@ namespace NActors { UNHANDLED_MSG_HANDLER \ } -#define LIGHTFN_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \ - switch (const ui32 etype = ev->GetTypeRewrite()) { \ - HANDLERS \ - default: \ - UNHANDLED_MSG_HANDLER \ - } - #define STRICT_STFUNC_BODY(HANDLERS) STFUNC_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER) -#define STRICT_LIGHTFN_BODY(HANDLERS) LIGHTFN_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER) #define STRICT_STFUNC(NAME, HANDLERS) \ void NAME(STFUNC_SIG) { \ STRICT_STFUNC_BODY(HANDLERS) \ } -#define STRICT_LIGHTFN(NAME, HANDLERS) \ - void NAME(LIGHTFN_SIG) { \ - STRICT_LIGHTFN_BODY(HANDLERS) \ - } - #define STRICT_STFUNC_EXC(NAME, HANDLERS, EXCEPTION_HANDLERS) \ void NAME(STFUNC_SIG) { \ try { \ @@ -837,7 +723,7 @@ namespace NActors { STFUNC(State) { if (DoBeforeReceiving(ev, ctx)) { - Actor->Receive(ev); + Actor->Receive(ev, ctx); DoAfterReceiving(ctx); } } @@ -934,25 +820,17 @@ namespace NActors { template <ESendingType SendingType> bool TActivationContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) { - IEventHandle::Forward(ev, recipient); - return Send(ev); + return Send(IEventHandle::Forward(ev, recipient)); } template <ESendingType SendingType> bool TActivationContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) { - IEventHandle::Forward(ev, recipient); - return Send(ev); + return Send(IEventHandle::Forward(ev, recipient)); } template <ESendingType SendingType> bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return Send<SendingType>(new IEventHandleFat(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); - } - - template <ESendingType SendingType> - bool TActorContext::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - ev->PrepareSend(recipient, SelfID, flags, cookie, std::move(traceId)); - return Send<SendingType>(ev); + return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); } template <ESendingType SendingType> @@ -962,14 +840,12 @@ namespace NActors { template <ESendingType SendingType> bool TActorContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const { - IEventHandle::Forward(ev, recipient); - return ExecutorThread.Send<SendingType>(ev); + return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient)); } template <ESendingType SendingType> bool TActorContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const { - IEventHandle::Forward(ev, recipient); - return ExecutorThread.Send<SendingType>(ev); + return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient)); } template <ESendingType SendingType> @@ -984,31 +860,7 @@ namespace NActors { template <ESendingType SendingType> bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return TActivationContext::Send<SendingType>(new IEventHandleFat(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); - } - - template <ESendingType SendingType> - bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev) const { - ev->PrepareSend(recipient, *this); - return TActivationContext::Send<SendingType>(ev); - } - - template <ESendingType SendingType> - bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const { - ev->PrepareSend(recipient, *this, flags); - return TActivationContext::Send<SendingType>(ev); - } - - template <ESendingType SendingType> - bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const { - ev->PrepareSend(recipient, *this, flags, cookie); - return TActivationContext::Send<SendingType>(ev); - } - - template <ESendingType SendingType> - bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - ev->PrepareSend(recipient, *this, flags, cookie, std::move(traceId)); - return TActivationContext::Send<SendingType>(ev); + return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); } template <ESendingType SendingType> diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h index 37286d48e2..9d89afcf70 100644 --- a/library/cpp/actors/core/actor_bootstrapped.h +++ b/library/cpp/actors/core/actor_bootstrapped.h @@ -12,7 +12,7 @@ namespace NActors { class TActorBootstrapped: public TActor<TDerived> { protected: TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { - return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0); + return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0); } STFUNC(StateBootstrap) { diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index 0fd54ff812..51e21e2950 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -41,9 +41,9 @@ namespace NActors { } THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TMonotonic deadline) { - IEventHandleFat *timeoutEv = nullptr; + IEventHandle *timeoutEv = nullptr; if (deadline != TMonotonic::Max()) { - TActivationContext::Schedule(deadline, timeoutEv = new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, + TActivationContext::Schedule(deadline, timeoutEv = new IEventHandle(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, nullptr, 0)); } diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index fdf928a803..7a7844cc85 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -57,8 +57,8 @@ namespace NActors { THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max()); // Wait for specific event set by filter functor. Function returns first event that matches filter. On any other - // kind of event processUnexpectedEvent() is called. // + // kind of event processUnexpectedEvent() is called. // Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; }); template <typename TFunc, typename TCallback, typename = std::enable_if_t<std::is_invocable_v<TCallback, TAutoPtr<IEventHandle>>>> THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TCallback processUnexpectedEvent, TMonotonic deadline = TMonotonic::Max()) { @@ -117,8 +117,7 @@ namespace NActors { bool Send(TAutoPtr<IEventHandle> ev); bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) { - IEventHandle::Forward(ev, recipient); - return Send(ev.Release()); + return Send(IEventHandle::Forward(ev, recipient).Release()); } void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) { @@ -166,7 +165,7 @@ namespace NActors { ~TActorCoro(); TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override { - return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0); + return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0); } private: diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index fe492e531e..86b19af546 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { ctx.RegisterWithSameMailbox(new TDummyActor()); } if (Role == ERole::Leader) { - TAutoPtr<IEventHandle> ev = new IEventHandleFat(Receiver, SelfId(), new TEvents::TEvPing()); + TAutoPtr<IEventHandle> ev = new IEventHandle(Receiver, SelfId(), new TEvents::TEvPing()); SpecialSend(ev, ctx); } } @@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } if (AllocatesMemory) { - SpecialSend(new IEventHandleFat(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx); + SpecialSend(new IEventHandle(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx); } else { std::swap(*const_cast<TActorId*>(&ev->Sender), *const_cast<TActorId*>(&ev->Recipient)); ev->DropRewrite(); @@ -541,14 +541,14 @@ Y_UNIT_TEST_SUITE(TestDecorator) { { } - bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { + bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) override { *Counter += 1; if (ev->Type != TEvents::THelloWorld::Pong) { - TAutoPtr<IEventHandle> pingEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPing()); + TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing()); SavedEvent = ev; - Actor->Receive(pingEv); + Actor->Receive(pingEv, ctx); } else { - Actor->Receive(SavedEvent); + Actor->Receive(SavedEvent, ctx); } return false; } @@ -566,7 +566,7 @@ Y_UNIT_TEST_SUITE(TestDecorator) { bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { *Counter += 1; if (ev->Type == TEvents::THelloWorld::Ping) { - TAutoPtr<IEventHandle> pongEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPong()); + TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong()); Send(SelfId(), new TEvents::TEvPong()); return false; } @@ -701,7 +701,7 @@ Y_UNIT_TEST_SUITE(TestStateFunc) { auto sender = runtime.AllocateEdgeActor(); auto testActor = runtime.Register(new TTestActorWithExceptionsStateFunc()); for (ui64 tag = 0; tag < 4; ++tag) { - runtime.Send(new IEventHandleFat(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true); + runtime.Send(new IEventHandle(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender); UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Tag, tag); } diff --git a/library/cpp/actors/core/actor_virtual.h b/library/cpp/actors/core/actor_virtual.h index 1306252b58..c9c34c4729 100644 --- a/library/cpp/actors/core/actor_virtual.h +++ b/library/cpp/actors/core/actor_virtual.h @@ -8,7 +8,7 @@ template <class TEvent> class TEventContext { private: TEvent* Event; - std::unique_ptr<IEventHandleFat> Handle; + std::unique_ptr<IEventHandle> Handle; public: const TEvent* operator->() const { return Event; @@ -16,7 +16,7 @@ public: const IEventHandle& GetHandle() const { return *Handle; } - TEventContext(std::unique_ptr<IEventHandleFat> handle) + TEventContext(std::unique_ptr<IEventHandle> handle) : Handle(std::move(handle)) { Y_VERIFY_DEBUG(dynamic_cast<TEvent*>(Handle->GetBase())); @@ -28,7 +28,7 @@ public: template <class TEvent, class TExpectedActor> class IEventForActor: public IEventBase { protected: - virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override { + virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override { Y_VERIFY_DEBUG(dynamic_cast<TExpectedActor*>(actor)); auto* actorCorrect = static_cast<TExpectedActor*>(actor); TEventContext<TEvent> context(std::move(eventPtr)); @@ -41,7 +41,7 @@ public: template <class TBaseEvent, class TEvent, class TExpectedObject> class IEventForAnything: public TBaseEvent { protected: - virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override { + virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override { auto* objImpl = dynamic_cast<TExpectedObject*>(actor); if (!objImpl) { return false; diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index b33861a966..9a8a0abd01 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -74,7 +74,7 @@ namespace NActors { if (recpNodeId != NodeId && recpNodeId != 0) { // if recipient is not local one - rewrite with forward instruction - //Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable()); + Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable()); Y_VERIFY(ev->Recipient == recipient, "Event rewrite from %s to %s would be lost via interconnect", ev->Recipient.ToString().c_str(), @@ -96,7 +96,7 @@ namespace NActors { } if (target != actorId) { // a race has occured, terminate newly created actor - Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0)); + Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0)); } } recipient = target; @@ -122,7 +122,7 @@ namespace NActors { bool TActorSystem::GenericSend<&IExecutorPool::SpecificSend>(TAutoPtr<IEventHandle> ev) const; bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie) const { - return this->Send(new IEventHandleFat(recipient, DefSelfID, ev, flags, cookie)); + return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie)); } bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev) const { @@ -140,14 +140,6 @@ namespace NActors { } } - bool TActorSystem::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const { - return this->Send(ev->PrepareSend(recipient, DefSelfID, flags, cookie)); - } - - bool TActorSystem::Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags, ui64 cookie) const { - return this->Send(ev->PrepareSend(recipient, sender, flags, cookie)); - } - void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { Schedule(deadline - Timestamp(), ev, cookie); } diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 0bc788f9be..b9ecaa2b9e 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -204,8 +204,6 @@ namespace NActors { bool SpecificSend(TAutoPtr<IEventHandle> ev) const; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const; - bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const; - bool Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const; /** * Schedule one-shot event that will be send at given time point in the future. diff --git a/library/cpp/actors/core/ask.cpp b/library/cpp/actors/core/ask.cpp index e3fe6d9ee3..40c6748d56 100644 --- a/library/cpp/actors/core/ask.cpp +++ b/library/cpp/actors/core/ask.cpp @@ -47,9 +47,9 @@ namespace NActors { if (ev->GetTypeRewrite() == TTimeout::EventType) { Promise_.SetException(std::make_exception_ptr(yexception() << "ask timeout")); } else if (!ExpectedEventType_ || ev->GetTypeRewrite() == ExpectedEventType_) { - Promise_.SetValue(IEventHandleFat::GetFat(ev.Get())->ReleaseBase()); + Promise_.SetValue(ev.Get()->ReleaseBase()); } else { - Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << IEventHandleFat::GetFat(ev.Get())->GetBase()->ToString())); + Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << ev.Get()->GetBase()->ToString())); } PassAway(); diff --git a/library/cpp/actors/core/av_bootstrapped.cpp b/library/cpp/actors/core/av_bootstrapped.cpp index e112302792..771177242e 100644 --- a/library/cpp/actors/core/av_bootstrapped.cpp +++ b/library/cpp/actors/core/av_bootstrapped.cpp @@ -7,7 +7,7 @@ public: }; TAutoPtr<NActors::IEventHandle> TActorAutoStart::AfterRegister(const TActorId& self, const TActorId& parentId) { - return new IEventHandleFat(self, parentId, new TEventForStart, 0); + return new IEventHandle(self, parentId, new TEventForStart, 0); } void TActorAutoStart::ProcessEvent(TEventContext<TEventForStart>& ev) { diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index b963dfb4ea..6ffe42f65b 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -7,79 +7,19 @@ namespace NActors { Max<ui64>(), Max<ui64>() }; - TAutoPtr<IEventHandle>& IEventHandle::Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient) { - if (ev->IsEventLight()) { - IEventHandleLight::GetLight(ev.Get())->Forward(recipient); - } else { - ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient); - } - return ev; - } - - THolder<IEventHandle>& IEventHandle::Forward(THolder<IEventHandle>& ev, TActorId recipient) { - if (ev->IsEventLight()) { - IEventHandleLight::GetLight(ev.Get())->Forward(recipient); - } else { - ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient); - } - return ev; - } - TString IEventHandle::GetTypeName() const { - if (IsEventFat()) { - auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this)); - return ev->HasEvent() ? TypeName(*(ev->GetBase())) : TypeName(*this); - } else { - return TypeName(*this); - } + return HasEvent() ? TypeName(*(const_cast<IEventHandle*>(this)->GetBase())) : TypeName(*this); } TString IEventHandle::ToString() const { - if (IsEventFat()) { - auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this)); - return ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?"; - } else { - // TODO(xenoxeno): - return TypeName(*this); - } + return HasEvent() ? const_cast<IEventHandle*>(this)->GetBase()->ToString().data() : "serialized?"; } - bool IEventHandle::HasEvent() const { - if (IsEventLight()) { - return true; - } else { - return IEventHandleFat::GetFat(this)->HasEvent(); - } + std::unique_ptr<IEventHandle> IEventHandle::Forward(std::unique_ptr<IEventHandle>&& ev, TActorId recipient) { + return std::unique_ptr<IEventHandle>(ev->Forward(recipient).Release()); } - bool IEventHandle::HasBuffer() const { - if (IsEventLight()) { - return false; - } else { - return IEventHandleFat::GetFat(this)->HasBuffer(); - } - } - - TActorId IEventHandle::GetForwardOnNondeliveryRecipient() const { - if (IsEventLight()) { - return {}; - } else { - return IEventHandleFat::GetFat(this)->GetForwardOnNondeliveryRecipient(); - } - } - - size_t IEventHandle::GetSize() const { - if (IsEventLight()) { - if (IsEventSerializable()) { - return IEventHandleLightSerializable::GetLightSerializable(this)->GetSize(); - } - } else { - return IEventHandleFat::GetFat(this)->GetSize(); - } - return 0; - } - - TIntrusivePtr<TEventSerializedData> IEventHandleFat::ReleaseChainBuffer() { + TIntrusivePtr<TEventSerializedData> IEventHandle::ReleaseChainBuffer() { if (Buffer) { TIntrusivePtr<TEventSerializedData> result; DoSwap(result, Buffer); @@ -96,7 +36,7 @@ namespace NActors { return new TEventSerializedData; } - TIntrusivePtr<TEventSerializedData> IEventHandleFat::GetChainBuffer() { + TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { if (Buffer) { return Buffer; } @@ -108,6 +48,4 @@ namespace NActors { } return new TEventSerializedData; } - - std::vector<std::vector<IEventFactory*>*> TEventFactories::EventFactories; } diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 71c29082d8..5c2cc726ef 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -9,7 +9,6 @@ #include <util/system/hp_timer.h> #include <util/generic/maybe.h> -#include <util/string/builder.h> namespace NActors { class TChunkSerializer; @@ -24,7 +23,7 @@ namespace NActors { public ISerializerToStream { protected: // for compatibility with virtual actors - virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandleFat> /*eventPtr*/) { + virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandle> /*eventPtr*/) { Y_VERIFY_DEBUG(false); return false; } @@ -34,7 +33,7 @@ namespace NActors { virtual ~IEventBase() { } - bool Execute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) { + bool Execute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) { return DoExecute(actor, std::move(eventPtr)); } @@ -54,70 +53,8 @@ namespace NActors { virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; } }; - struct IEventHandleFields { - enum EFlags : ui32 { - FlagTrackDelivery = 1 << 0, - FlagForwardOnNondelivery = 1 << 1, - FlagSubscribeOnSession = 1 << 2, - FlagUseSubChannel = 1 << 3, - FlagGenerateUnsureUndelivered = 1 << 4, - FlagExtendedFormat = 1 << 5, - FlagLight = 1 << 18, - FlagSerializable = 1 << 19, - }; - - static constexpr ui32 STICKY_FLAGS = (FlagLight | FlagSerializable); - - TActorId Recipient = {}; - TActorId Sender = {}; - ui32 Type = {}; - -#pragma pack(push, 1) - union { - ui32 Flags = {}; - struct { - ui32 NormalFlags:6; - ui32 ReservedFlags:12; - ui32 StickyFlags:2; - ui32 Channel:12; - }; - struct { - // flags - ui32 TrackDelivery:1; - ui32 ForwardOnNondeliveryFlag:1; - ui32 SubscribeOnSession:1; - ui32 UseSubChannel:1; - ui32 GenerateUnsureUndelivered:1; - ui32 ExtendedFormat:1; - // reserved - ui32 Reserved:12; - // sticky flags - ui32 Light:1; - ui32 Serializable:1; - }; - }; -#pragma pack(pop) - - ui64 Cookie = {}; - - TActorId RewriteRecipient = {}; - ui32 RewriteType = {}; - - NWilson::TTraceId TraceId = {}; - -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - ::NHPTimer::STime SendTime = 0; -#endif - }; - - class IEventHandle : public IEventHandleFields { - public: - IEventHandle(IEventHandleFields&& fields = {}) - : IEventHandleFields(std::move(fields)) - {} - - virtual ~IEventHandle() = default; - + // fat handle + class IEventHandle : TNonCopyable { struct TOnNondelivery { TActorId Recipient; @@ -127,143 +64,29 @@ namespace NActors { } }; - THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events - - ui16 GetChannel() const noexcept { - return Channel; - } - - ui64 GetSubChannel() const noexcept { - return UseSubChannel ? Sender.LocalId() : 0ULL; - } - - // deprecate(xenoxeno) ? - static const size_t ChannelBits = 12; - static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits; - - static ui32 MakeFlags(ui32 channel, ui32 flags) { - Y_VERIFY(channel < (1 << ChannelBits)); - Y_VERIFY(flags < (1 << ChannelShift)); - return (flags | (channel << ChannelShift)); - } - // - - void SetFlags(ui32 flags) { - Flags = (flags & ~STICKY_FLAGS) | (Flags & STICKY_FLAGS); - } - - const TActorId& GetRecipientRewrite() const { - return RewriteRecipient; - } - - void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) { - RewriteRecipient = recipientRewrite; - RewriteType = typeRewrite; - } - - void DropRewrite() { - RewriteRecipient = Recipient; - RewriteType = Type; - } - - ui32 GetTypeRewrite() const { - return RewriteType; - } - - bool IsEventLight() const { - return Light; - } - - bool IsEventFat() const { - return !Light; - } - - bool IsEventSerializable() const { - return Serializable; - } - - template<typename TEventType> - TEventType* Get(); - template<typename TEventType> - TEventType* CastAsLocal(); - template<typename TEventType> - TEventType* StaticCastAsLocal(); - bool HasEvent() const; - bool HasBuffer() const; - TString GetTypeName() const; - TString ToString() const; - size_t GetSize() const; - static TAutoPtr<IEventHandle>& Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient); - static THolder<IEventHandle>& Forward(THolder<IEventHandle>& ev, TActorId recipient); - static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure = false); - static TAutoPtr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure = false); - template<typename TEventType> - static TEventType* Release(TAutoPtr<IEventHandle>&); - template<typename TEventType> - static TEventType* Release(THolder<IEventHandle>&); - template<typename TEventType> - static TEventType* Release(std::unique_ptr<IEventHandle>&); - - template<typename TEventTypeSmartPtr> - static TAutoPtr<IEventHandle> ForwardOnNondelivery(TEventTypeSmartPtr& ev, ui32 reason, bool unsure = false) { - TAutoPtr<IEventHandle> evi(ev.Release()); - return ForwardOnNondelivery(evi, reason, unsure); - } - - TActorId GetForwardOnNondeliveryRecipient() const; - }; - - // fat handle - class IEventHandleFat : public IEventHandle, TNonCopyable { public: template <typename TEv> - inline TEv* CastAsLocal() const noexcept { // cast with event check + inline TEv* CastAsLocal() const noexcept { auto fits = GetTypeRewrite() == TEv::EventType; - constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());}; - if constexpr (couldBeCasted) { - return fits ? static_cast<TEv*>(Event.Get()) : nullptr; - } else { - Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data()); - } - } - template <typename TEv> - inline TEv* StaticCastAsLocal() const noexcept { // blind cast - constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());}; - if constexpr (couldBeCasted) { - return static_cast<TEv*>(Event.Get()); - } else { - Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data()); - } + return fits ? static_cast<TEv*>(Event.Get()) : nullptr; } template <typename TEventType> TEventType* Get() { if (Type != TEventType::EventType) - Y_FAIL("%s", (TStringBuilder() - << "Event type " << Type - << " doesn't match the expected type " << TEventType::EventType - << " requested typename " << TypeName<TEventType>() - << " actual typename " << GetTypeName()).data()); - - constexpr bool couldBeGot = requires() { - Event.Reset(TEventType::Load(Buffer.Get())); - static_cast<TEventType*>(Event.Get()); - }; - if constexpr (couldBeGot) { - if (!Event) { - static TEventSerializedData empty; - Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty)); - } - - if (Event) { - return static_cast<TEventType*>(Event.Get()); - } - - Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data()); - } else { - Y_FAIL("Event type %" PRIu32 " couldn't be get as type %s", Type, TypeName<TEventType>().data()); + Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType); + + if (!Event) { + static TEventSerializedData empty; + Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty)); + } + + if (Event) { + return static_cast<TEventType*>(Event.Get()); } + + Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data()); } template <typename T> @@ -274,43 +97,104 @@ namespace NActors { return x; } + enum EFlags { + FlagTrackDelivery = 1 << 0, + FlagForwardOnNondelivery = 1 << 1, + FlagSubscribeOnSession = 1 << 2, + FlagUseSubChannel = 1 << 3, + FlagGenerateUnsureUndelivered = 1 << 4, + FlagExtendedFormat = 1 << 5, + }; + + const ui32 Type; + const ui32 Flags; + const TActorId Recipient; + TActorId Sender; + const ui64 Cookie; const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect + // if set, used by ActorSystem/Interconnect to report tracepoints + NWilson::TTraceId TraceId; + // filled if feeded by interconnect session const TActorId InterconnectSession; +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + ::NHPTimer::STime SendTime; +#endif + + static const size_t ChannelBits = 12; + static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits; #ifdef USE_ACTOR_CALLSTACK TCallstack Callstack; #endif + ui16 GetChannel() const noexcept { + return Flags >> ChannelShift; + } + + ui64 GetSubChannel() const noexcept { + return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL; + } + + static ui32 MakeFlags(ui32 channel, ui32 flags) { + Y_VERIFY(channel < (1 << ChannelBits)); + Y_VERIFY(flags < (1 << ChannelShift)); + return (flags | (channel << ChannelShift)); + } + private: THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; + TActorId RewriteRecipient; + ui32 RewriteType; + + THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events + public: + void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) { + RewriteRecipient = recipientRewrite; + RewriteType = typeRewrite; + } + + void DropRewrite() { + RewriteRecipient = Recipient; + RewriteType = Type; + } + + const TActorId& GetRecipientRewrite() const { + return RewriteRecipient; + } + + ui32 GetTypeRewrite() const { + return RewriteType; + } + TActorId GetForwardOnNondeliveryRecipient() const { return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId(); } - IEventHandleFat(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, + IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) - : IEventHandle({ - .Recipient = recipient, - .Sender = sender, - .Type = ev->Type(), - .Flags = flags, - .Cookie = cookie, - .RewriteRecipient = recipient, - .RewriteType = ev->Type(), - .TraceId = std::move(traceId), - }) + : Type(ev->Type()) + , Flags(flags) + , Recipient(recipient) + , Sender(sender) + , Cookie(cookie) + , TraceId(std::move(traceId)) +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + , SendTime(0) +#endif , Event(ev) + , RewriteRecipient(Recipient) + , RewriteType(Type) { if (forwardOnNondelivery) OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery)); } - IEventHandleFat(ui32 type, + IEventHandle(ui32 type, ui32 flags, const TActorId& recipient, const TActorId& sender, @@ -318,24 +202,25 @@ namespace NActors { ui64 cookie, const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) - : IEventHandle({ - .Recipient = recipient, - .Sender = sender, - .Type = type, - .Flags = flags, - .Cookie = cookie, - .RewriteRecipient = recipient, - .RewriteType = type, - .TraceId = std::move(traceId), - }) + : Type(type) + , Flags(flags) + , Recipient(recipient) + , Sender(sender) + , Cookie(cookie) + , TraceId(std::move(traceId)) +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + , SendTime(0) +#endif , Buffer(std::move(buffer)) + , RewriteRecipient(Recipient) + , RewriteType(Type) { if (forwardOnNondelivery) OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery)); } // Special ctor for events from interconnect. - IEventHandleFat(const TActorId& session, + IEventHandle(const TActorId& session, ui32 type, ui32 flags, const TActorId& recipient, @@ -344,19 +229,20 @@ namespace NActors { ui64 cookie, TScopeId originScopeId, NWilson::TTraceId traceId) noexcept - : IEventHandle({ - .Recipient = recipient, - .Sender = sender, - .Type = type, - .Flags = flags, - .Cookie = cookie, - .RewriteRecipient = recipient, - .RewriteType = type, - .TraceId = std::move(traceId), - }) + : Type(type) + , Flags(flags) + , Recipient(recipient) + , Sender(sender) + , Cookie(cookie) , OriginScopeId(originScopeId) + , TraceId(std::move(traceId)) , InterconnectSession(session) +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + , SendTime(0) +#endif , Buffer(std::move(buffer)) + , RewriteRecipient(Recipient) + , RewriteType(Type) { } @@ -401,267 +287,63 @@ namespace NActors { TAutoPtr<IEventHandle> Forward(const TActorId& dest) { if (Event) - return new IEventHandleFat(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId)); + return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId)); else - return new IEventHandleFat(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId)); + return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId)); } - static TAutoPtr<IEventHandleFat> MakeFat(TAutoPtr<IEventHandle> ev) { - if (ev->IsEventLight()) { - Y_FAIL("Can't make light event fat"); - } else { - TAutoPtr<IEventHandleFat> evb(static_cast<IEventHandleFat*>(ev.Release())); - return evb; - } - } + TString GetTypeName() const; + TString ToString() const; - static IEventHandleFat* GetFat(IEventHandle* ev) { - if (ev->IsEventLight()) { - Y_FAIL("Can't make light event fat"); - } else { - return static_cast<IEventHandleFat*>(ev); - } - } + [[nodiscard]] static std::unique_ptr<IEventHandle> Forward(std::unique_ptr<IEventHandle>&& ev, TActorId recipient); + [[nodiscard]] static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>&& ev, ui32 reason, bool unsure = false); - static const IEventHandleFat* GetFat(const IEventHandle* ev) { - if (ev->IsEventLight()) { - Y_FAIL("Can't make light event fat"); - } else { - return static_cast<const IEventHandleFat*>(ev); - } + [[nodiscard]] static TAutoPtr<IEventHandle> Forward(TAutoPtr<IEventHandle>&& ev, TActorId recipient) { + return Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release(); } - static IEventHandleFat* GetFat(TAutoPtr<IEventHandle>& ev) { - return GetFat(ev.Get()); + [[nodiscard]] static THolder<IEventHandle> Forward(THolder<IEventHandle>&& ev, TActorId recipient) { + return THolder(Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release()); } - static IEventHandleFat* GetFat(THolder<IEventHandle>& ev) { - return GetFat(ev.Get()); + [[nodiscard]] static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>&& ev, ui32 reason, bool unsure = false) { + return ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release(); } - static IEventHandleFat* GetFat(std::unique_ptr<IEventHandle>& ev) { - return GetFat(ev.get()); + [[nodiscard]] static THolder<IEventHandle> ForwardOnNondelivery(THolder<IEventHandle>&& ev, ui32 reason, bool unsure = false) { + return THolder(ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release()); } - static TAutoPtr<IEventHandle> MakeBase(TAutoPtr<IEventHandleFat> ev) { - return ev.Release(); + template<typename T> + static TAutoPtr<T> Release(TAutoPtr<IEventHandle>& ev) { + return ev->Release<T>(); } - static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure = false); - }; - - template <typename TEventType> - class TEventHandleFat: public IEventHandleFat { - TEventHandleFat(); // we never made instance of TEventHandleFat - public: - TEventType* Get() { - return IEventHandleFat::Get<TEventType>(); + template<typename T> + static TAutoPtr<T> Release(THolder<IEventHandle>& ev) { + return ev->Release<T>(); } - TAutoPtr<TEventType> Release() { - return IEventHandleFat::Release<TEventType>(); + template <typename TEv> + inline TEv* StaticCastAsLocal() const noexcept { // blind cast + return static_cast<TEv*>(Event.Get()); } }; - static_assert(sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat), "expect sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat)"); - - // light handle - class IEventHandleLight : public IEventHandle { + template <typename TEventType> + class TEventHandle: public IEventHandle { + TEventHandle(); // we never made instance of TEventHandle public: - IEventHandleLight(ui32 type) { - RewriteType = Type = type; - Light = true; - } - - IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender) { - RewriteRecipient = Recipient = recipient; - Sender = sender; - return this; - } - - IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags) { - RewriteRecipient = Recipient = recipient; - Sender = sender; - SetFlags(flags); - return this; - } - - IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie) { - RewriteRecipient = Recipient = recipient; - Sender = sender; - SetFlags(flags); - Cookie = cookie; - return this; - } - - IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) { - RewriteRecipient = Recipient = recipient; - Sender = sender; - SetFlags(flags); - Cookie = cookie; - TraceId = std::move(traceId); - return this; - } - - void Forward(const TActorId& dest) { - RewriteRecipient = dest; - } - - static IEventHandleLight* GetLight(IEventHandle* ev) { - if (ev->IsEventLight()) { - return static_cast<IEventHandleLight*>(ev); - } else { - Y_FAIL("Can't make fat event light"); - } - } - - static IEventHandleLight* GetLight(TAutoPtr<IEventHandle>& ev) { - return GetLight(ev.Get()); - } - - static IEventHandleLight* GetLight(THolder<IEventHandle>& ev) { - return GetLight(ev.Get()); - } - - static IEventHandleLight* GetLight(std::unique_ptr<IEventHandle>& ev) { - return GetLight(ev.get()); - } - - static IEventHandleLight* ReleaseLight(TAutoPtr<IEventHandle>& ev) { - return GetLight(ev.Release()); - } - - static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure = false); - - template<typename TEventType> TEventType* Get() { - if (Type != TEventType::EventType) { - Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType); - } - constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);}; - if constexpr (couldBeConverted) { - return static_cast<TEventType*>(this); - } else { - Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data()); - } - } - - template<typename TEventType> - TEventType* CastAsLocal() { - constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);}; - if constexpr (couldBeConverted) { - if (Type == TEventType::EventType) { - return static_cast<TEventType*>(this); - } - } - return nullptr; + return IEventHandle::Get<TEventType>(); } - template<typename TEventType> - TEventType* StaticCastAsLocal() { - constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);}; - if constexpr (couldBeConverted) { - return static_cast<TEventType*>(this); - } - Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data()); + TAutoPtr<TEventType> Release() { + return IEventHandle::Release<TEventType>(); } }; - template<typename TEventType> - TEventType* IEventHandle::Get() { - if (IsEventLight()) { - return IEventHandleLight::GetLight(this)->Get<TEventType>(); - } else { - return IEventHandleFat::GetFat(this)->Get<TEventType>(); - } - } - - template<typename TEventType> - TEventType* IEventHandle::CastAsLocal() { - if (IsEventLight()) { - return IEventHandleLight::GetLight(this)->CastAsLocal<TEventType>(); - } else { - return IEventHandleFat::GetFat(this)->CastAsLocal<TEventType>(); - } - } - - template<typename TEventType> - TEventType* IEventHandle::StaticCastAsLocal() { - if (IsEventLight()) { - return IEventHandleLight::GetLight(this)->StaticCastAsLocal<TEventType>(); - } else { - return IEventHandleFat::GetFat(this)->StaticCastAsLocal<TEventType>(); - } - } - - template<typename TEventType> - TEventType* IEventHandle::Release(TAutoPtr<IEventHandle>& ev) { - if (ev->IsEventLight()) { - return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>(); - } else { - return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release(); - } - } - - template<typename TEventType> - TEventType* IEventHandle::Release(THolder<IEventHandle>& ev) { - if (ev->IsEventLight()) { - return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>(); - } else { - return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release(); - } - } - - template<typename TEventType> - TEventType* IEventHandle::Release(std::unique_ptr<IEventHandle>& ev) { - if (ev->IsEventLight()) { - return IEventHandleLight::GetLight(ev.release())->CastAsLocal<TEventType>(); - } else { - return IEventHandleFat::GetFat(ev.get())->Release<TEventType>().Release(); - } - } - - - class IEventHandleLightSerializable; - - using TEventSerializer = std::function<bool(const IEventHandleLightSerializable*, TChunkSerializer*)>; - - class IEventHandleLightSerializable : public IEventHandleLight { - public: - TEventSerializer Serializer; - - IEventHandleLightSerializable(ui32 type, TEventSerializer serializer) - : IEventHandleLight(type) - , Serializer(std::move(serializer)) - { - Serializable = true; - } - - static IEventHandleLightSerializable* GetLightSerializable(IEventHandle* ev) { - if (ev->IsEventSerializable()) { - return static_cast<IEventHandleLightSerializable*>(ev); - } else { - Y_FAIL("Can't make serializable event"); - } - } - - static const IEventHandleLightSerializable* GetLightSerializable(const IEventHandle* ev) { - if (ev->IsEventSerializable()) { - return static_cast<const IEventHandleLightSerializable*>(ev); - } else { - Y_FAIL("Can't make serializable event"); - } - } - - static IEventHandleLightSerializable* GetLightSerializable(TAutoPtr<IEventHandle>& ev) { - return GetLightSerializable(ev.Get()); - } - - size_t GetSize() const { - // TODO(xenoxeno) - return 0; - } - }; + static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)"); template <typename TEventType, ui32 EventType0> class TEventBase: public IEventBase { @@ -672,7 +354,7 @@ namespace NActors { } // still abstract - typedef TEventHandleFat<TEventType> THandle; + typedef TEventHandle<TEventType> THandle; typedef TAutoPtr<THandle> TPtr; }; @@ -681,7 +363,7 @@ namespace NActors { return TString(header); \ } \ bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \ - Y_FAIL("Local event %s is not serializable", TypeName(*this).data()); \ + Y_FAIL("Local event " #eventType " is not serializable"); \ } \ static IEventBase* Load(NActors::TEventSerializedData*) { \ Y_FAIL("Local event " #eventType " has no load method"); \ @@ -703,39 +385,4 @@ namespace NActors { bool IsSerializable() const override { \ return true; \ } - - - struct TEventMeta { - TActorId Session; - ui32 Type; - ui32 Flags; - TActorId Recipient; - TActorId Sender; - ui64 Cookie; - TScopeId OriginScopeId; - NWilson::TTraceId TraceId; - TRope Data; - - bool IsExtendedFormat() const { - return Flags & IEventHandle::FlagExtendedFormat; - } - }; - - inline constexpr ui32 GetEventSpace(ui32 eventType) { - return (eventType >> 16u); - } - - inline constexpr ui32 GetEventSubType(ui32 eventType) { - return (eventType & (0xffff)); - } - - struct IEventFactory { - virtual IEventHandle* Construct(const TEventMeta& eventMeta) = 0; - virtual void Destruct(IEventHandle*) = 0; - }; - - struct TEventFactories { - // it supposed to be statically allocated in the begining - static std::vector<std::vector<IEventFactory*>*> EventFactories; // [EventSpace][EventSubType] - }; } diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index b845760221..30cc26aa46 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -7,7 +7,7 @@ #include <library/cpp/actors/wilson/wilson_trace.h> namespace NActors { - class IEventHandleFat; + class IEventHandle; struct TConstIoVec { const void* Data; diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h index c5fff6af35..2845aa94dd 100644 --- a/library/cpp/actors/core/event_local.h +++ b/library/cpp/actors/core/event_local.h @@ -71,21 +71,4 @@ namespace NActors { return new TEv(); } }; - - template<typename TEv, ui32 EventType0> - class TEventLight : public IEventHandleLight { - public: - static constexpr ui32 EventType = EventType0; - - TEventLight() - : IEventHandleLight(EventType) - {} - - TEv* Get() { - return static_cast<TEv*>(this); - } - - using THandle = TEv; - using TPtr = TAutoPtr<TEv>; - }; } diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index d56a9b91de..8fd43aa34d 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -513,253 +513,4 @@ namespace NActors { dest->SetRawX1(src.RawX1()); dest->SetRawX2(src.RawX2()); } - - template<ui32 EventSpace> - struct TLightEventSpaceFactories { - public: - static std::vector<IEventFactory*>& GetEventSpaceFactories(ui32 eventSpace) { - if (TEventFactories::EventFactories.size() <= eventSpace) { - // it's only safe to do BEFORE initialization of ActorSystem - TEventFactories::EventFactories.resize(eventSpace + 1); - } - if (TEventFactories::EventFactories[eventSpace] == nullptr) { - TEventFactories::EventFactories[eventSpace] = new std::vector<IEventFactory*>(); - } - return *TEventFactories::EventFactories[eventSpace]; - } - - static void SetEventFactory(ui32 eventType, IEventFactory* factory) { - Y_VERIFY(GetEventSpace(eventType) == EventSpace); - auto eventSpaceFactories = GetEventSpaceFactories(GetEventSpace(eventType)); - if (eventSpaceFactories.size() <= GetEventSubType(eventType)) { - // it's only safe to do BEFORE initialization of ActorSystem - eventSpaceFactories.resize(GetEventSubType(eventType) + 1); - } - eventSpaceFactories[GetEventSubType(eventType)] = factory; - } - }; - - template<typename TEvent> - class TLightEventFactory : public IEventFactory, TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)> { - private: - mutable size_t CachedByteSize = 0; - - static constexpr char PayloadMarker = 0x07; - static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7; - - static size_t SerializeNumber(size_t num, char *buffer) { - char *begin = buffer; - do { - *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00); - num >>= 7; - } while (num); - return buffer - begin; - } - - static size_t DeserializeNumber(const char **ptr, const char *end) { - const char *p = *ptr; - size_t res = 0; - size_t offset = 0; - for (;;) { - if (p == end) { - return Max<size_t>(); - } - const char byte = *p++; - res |= (static_cast<size_t>(byte) & 0x7F) << offset; - offset += 7; - if (!(byte & 0x80)) { - break; - } - } - *ptr = p; - return res; - } - - static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) { - size_t res = 0; - size_t offset = 0; - for (;;) { - if (!iter.Valid()) { - return Max<size_t>(); - } - const char byte = *iter.ContiguousData(); - iter += 1; - --size; - res |= (static_cast<size_t>(byte) & 0x7F) << offset; - offset += 7; - if (!(byte & 0x80)) { - break; - } - } - return res; - } - - public: - void DeserializeProtoEvent(TEvent* event, const TEventMeta& eventMeta) { - TRope::TConstIterator iter = eventMeta.Data.Begin(); - ui64 size = eventMeta.Data.GetSize(); - if (eventMeta.IsExtendedFormat()) { - constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;}; - if constexpr (hasPayload) { - // check marker - if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { - Y_FAIL("Invalid event marker"); - } - // skip marker - iter += 1; - --size; - // parse number of payload ropes - size_t numRopes = DeserializeNumber(iter, size); - if (numRopes == Max<size_t>()) { - Y_FAIL("Invalid event rope number"); - } - while (numRopes--) { - // parse length of the rope - const size_t len = DeserializeNumber(iter, size); - if (len == Max<size_t>() || size < len) { - Y_FAIL("%s", (TStringBuilder() << "Invalid event len# " << len << " size# " << size).data()); - } - // extract the rope - TRope::TConstIterator begin = iter; - iter += len; - size -= len; - event->Payload.emplace_back(begin, iter); - } - } else { - Y_FAIL("%s", (TStringBuilder() << "Extended format is not supported for event " << TypeName<TEvent>()).data()); - } - } - // parse the protobuf - TRopeStream stream(iter, size); - if (!event->Record.ParsePartialFromZeroCopyStream(&stream)) { - Y_FAIL("%s", (TStringBuilder() << "Failed to parse protobuf event type " << eventMeta.Type << " class " << TypeName<TEvent>()).data()); - } - } - - static bool SerializeProtoEvent(const TEvent* event, TChunkSerializer* chunker) { - constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;}; - if constexpr (hasPayload) { - // serialize payload first - if (event->Payload) { - void *data; - int size = 0; - auto append = [&](const char *p, size_t len) { - while (len) { - if (size) { - const size_t numBytesToCopy = std::min<size_t>(size, len); - memcpy(data, p, numBytesToCopy); - data = static_cast<char*>(data) + numBytesToCopy; - size -= numBytesToCopy; - p += numBytesToCopy; - len -= numBytesToCopy; - } else if (!chunker->Next(&data, &size)) { - return false; - } - } - return true; - }; - auto appendNumber = [&](size_t number) { - char buf[MaxNumberBytes]; - return append(buf, SerializeNumber(number, buf)); - }; - char marker = PayloadMarker; - append(&marker, 1); - if (!appendNumber(event->Payload.size())) { - return false; - } - for (const TRope& rope : event->Payload) { - if (!appendNumber(rope.GetSize())) { - return false; - } - if (rope) { - if (size) { - chunker->BackUp(std::exchange(size, 0)); - } - if (!chunker->WriteRope(&rope)) { - return false; - } - } - } - if (size) { - chunker->BackUp(size); - } - } - } - return event->Record.SerializeToZeroCopyStream(chunker); - } - - public: - TEvent* New() { - return new TEvent(); - } - - void Delete(TEvent* event) { - delete event; - } - - virtual IEventHandle* Construct(const TEventMeta& eventMeta) override { - IEventHandle* ev = nullptr; - if (eventMeta.Type == TEvent::EventType) { - TEvent* event = New(); - DeserializeProtoEvent(event, eventMeta); - } - return ev; - } - - virtual void Destruct(IEventHandle* ev) override { - if (ev->Type != TEvent::EventType) { - Y_FAIL("Wrong event supplied"); - } - Delete(static_cast<TEvent*>(ev)); - } - - TLightEventFactory() { - Cerr << "TLightEventFactory<" << TypeName<TEvent>() << ">()" << Endl; - TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)>::SetEventFactory(TEvent::EventType, this); - } - }; - - template<typename TEv> - struct TLightEventFactoryInitializer { - static TLightEventFactory<TEv> EventFactory; - - static TEv* New() { - return EventFactory.New(); - } - - static void Delete(TEv* event) { - EventFactory.Delete(event); - } - }; - - template<typename TEv> - TLightEventFactory<TEv> TLightEventFactoryInitializer<TEv>::EventFactory; - - template<typename TEv, typename TRecord, ui32 EventType0> - class TEventLightPB : public IEventHandleLightSerializable, public TLightEventFactoryInitializer<TEv> { - public: - static constexpr ui32 EventType = EventType0; - - TRecord Record; - - static bool SerializeProto(const IEventHandleLightSerializable* event, TChunkSerializer* chunker) { - return TLightEventFactory<TEv>::SerializeProtoEvent(static_cast<const TEv*>(event), chunker); - } - - TEventLightPB() - : IEventHandleLightSerializable(EventType, &TEventLightPB<TEv, TRecord, EventType0>::SerializeProto) - {} - - TEv* Get() { - return static_cast<TEv*>(this); - } - - using TPtr = TAutoPtr<TEv>; - }; - - template<typename TEv, typename TRecord, ui32 EventType0> - class TEventLightPBWithPayload : public TEventLightPB<TEv, TRecord, EventType0> { - public: - std::vector<TRope> Payload; - }; } diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp index ea5fd10124..70b9ea2d71 100644 --- a/library/cpp/actors/core/events_undelivered.cpp +++ b/library/cpp/actors/core/events_undelivered.cpp @@ -38,68 +38,20 @@ namespace NActors { return new TEvUndelivered(sourceType, reason); } - TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure) { - std::unique_ptr<IEventHandle> tev(ev.Release()); - TAutoPtr<IEventHandle> fw = ForwardOnNondelivery(tev, reason, unsure); - if (tev) { - // we don't want to delete original event handle here - ev = tev.release(); - } - return fw; - } - - TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure) { - if (ev->IsEventFat()) { - std::unique_ptr<IEventHandleFat> evf(IEventHandleFat::GetFat(ev.release())); - std::unique_ptr<IEventHandle> fw = IEventHandleFat::ForwardOnNondelivery(evf, reason, unsure); - if (evf) { - ev = std::unique_ptr<IEventHandle>(evf.release()); - } - return fw.release(); - } - if (ev->IsEventLight()) { - std::unique_ptr<IEventHandleLight> evl(IEventHandleLight::GetLight(ev.release())); - std::unique_ptr<IEventHandle> fw = IEventHandleLight::ForwardOnNondelivery(evl, reason, unsure); - if (evl) { - ev = std::unique_ptr<IEventHandle>(evl.release()); - } - return fw.release(); - } - return {}; - } - - std::unique_ptr<IEventHandle> IEventHandleFat::ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure) { - if (ev->ForwardOnNondeliveryFlag) { + std::unique_ptr<IEventHandle> IEventHandle::ForwardOnNondelivery(std::unique_ptr<IEventHandle>&& ev, ui32 reason, bool unsure) { + if (ev->Flags & FlagForwardOnNondelivery) { const ui32 updatedFlags = ev->Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession); const TActorId recp = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId(); if (ev->Event) - return std::unique_ptr<IEventHandle>(new IEventHandleFat(recp, ev->Sender, ev->Event.Release(), updatedFlags, ev->Cookie, &ev->Recipient, std::move(ev->TraceId))); + return std::unique_ptr<IEventHandle>(new IEventHandle(recp, ev->Sender, ev->Event.Release(), updatedFlags, ev->Cookie, &ev->Recipient, std::move(ev->TraceId))); else - return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Type, updatedFlags, recp, ev->Sender, ev->Buffer, ev->Cookie, &ev->Recipient, std::move(ev->TraceId))); - } - - if (ev->TrackDelivery) { - const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered); - return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags, - ev->Cookie, nullptr, std::move(ev->TraceId))); - } - return {}; - } - - std::unique_ptr<IEventHandle> IEventHandleLight::ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure) { - if (ev->ForwardOnNondeliveryFlag) { - ev->ForwardOnNondeliveryFlag = false; - ev->SubscribeOnSession = false; - auto recpt = ev->Recipient; - ev->Recipient = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId(); - ev->OnNondeliveryHolder = MakeHolder<TOnNondelivery>(recpt); - return std::unique_ptr<IEventHandle>(ev.release()); + return std::unique_ptr<IEventHandle>(new IEventHandle(ev->Type, updatedFlags, recp, ev->Sender, ev->Buffer, ev->Cookie, &ev->Recipient, std::move(ev->TraceId))); } - if (ev->TrackDelivery) { + if (ev->Flags & FlagTrackDelivery) { const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered); - return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags, + return std::unique_ptr<IEventHandle>(new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags, ev->Cookie, nullptr, std::move(ev->TraceId))); } return {}; diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index c8267fa715..611899c6ef 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -176,7 +176,7 @@ namespace NActors { NProfiling::TMemoryTagScope::Reset(ActorSystem->MemProfActivityBase + activityType); } - actor->Receive(ev); + actor->Receive(ev, ctx); size_t dyingActorsCnt = DyingActors.size(); Ctx.UpdateActorsStats(dyingActorsCnt); @@ -202,7 +202,7 @@ namespace NActors { } else { actorType = nullptr; - TAutoPtr<IEventHandle> nonDelivered = IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown); + TAutoPtr<IEventHandle> nonDelivered = IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::ReasonActorUnknown); if (nonDelivered.Get()) { ActorSystem->Send(nonDelivered); } else { diff --git a/library/cpp/actors/core/io_dispatcher.cpp b/library/cpp/actors/core/io_dispatcher.cpp index cccb2336ff..6bd753f2e0 100644 --- a/library/cpp/actors/core/io_dispatcher.cpp +++ b/library/cpp/actors/core/io_dispatcher.cpp @@ -116,7 +116,7 @@ namespace NActors { bool sendNotify; if (!Actor.TaskQueue.Dequeue(tasks, &sendNotify)) { if (sendNotify) { - ActorSystem->Send(new IEventHandleFat(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(), + ActorSystem->Send(new IEventHandle(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(), nullptr, TThread::CurrentThreadId())); } break; diff --git a/library/cpp/actors/core/io_dispatcher.h b/library/cpp/actors/core/io_dispatcher.h index a33cb0d98e..b0e4e60d1a 100644 --- a/library/cpp/actors/core/io_dispatcher.h +++ b/library/cpp/actors/core/io_dispatcher.h @@ -28,7 +28,7 @@ namespace NActors { */ template<typename TCallback> static void InvokeIoCallback(TCallback&& callback, ui32 poolId, IActor::EActivityType activityType) { - if (!TActivationContext::Send(new IEventHandleFat(MakeIoDispatcherActorId(), TActorId(), + if (!TActivationContext::Send(new IEventHandle(MakeIoDispatcherActorId(), TActorId(), new TEvInvokeQuery(callback)))) { TActivationContext::Register(CreateExecuteLaterActor(std::move(callback), activityType), TActorId(), TMailboxType::HTSwap, poolId); diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index c7bed09f1b..7c569a2dad 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -316,7 +316,7 @@ namespace NActors { { const NLog::TSettings *mSettings = ctx.LoggerSettings(); TLoggerActor::Throttle(*mSettings); - ctx.Send(new IEventHandleFat(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); + ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); } template <typename TCtx, typename... TArgs> diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp index 572af788ec..995e3c4121 100644 --- a/library/cpp/actors/core/log_ut.cpp +++ b/library/cpp/actors/core/log_ut.cpp @@ -105,19 +105,19 @@ namespace { {} void WriteLog() { - Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); + Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); } void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg, TString msg = "foo") { - Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)}); + Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)}); } void FlushLogBuffer() { - Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TFlushLogBuffer()}); + Runtime.Send(new IEventHandle{LoggerActor, {}, new TFlushLogBuffer()}); } void Wakeup() { - Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvents::TEvWakeup}); + Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup}); } TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()}; diff --git a/library/cpp/actors/dnsresolver/dnsresolver.cpp b/library/cpp/actors/dnsresolver/dnsresolver.cpp index 396194b266..71e7f4d037 100644 --- a/library/cpp/actors/dnsresolver/dnsresolver.cpp +++ b/library/cpp/actors/dnsresolver/dnsresolver.cpp @@ -328,7 +328,7 @@ namespace NDnsResolver { } result->Status = status; - reqCtx->ActorSystem->Send(new IEventHandleFat(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie)); + reqCtx->ActorSystem->Send(new IEventHandle(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie)); break; } @@ -356,7 +356,7 @@ namespace NDnsResolver { } result->Status = status; - reqCtx->ActorSystem->Send(new IEventHandleFat(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie)); + reqCtx->ActorSystem->Send(new IEventHandle(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie)); break; } } diff --git a/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp b/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp index 5f9fd3c444..89a7e9ab36 100644 --- a/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp +++ b/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp @@ -256,7 +256,7 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) { } void Sleep(TDuration duration) { - Schedule(new IEventHandleFat(Sleeper, Sleeper, new TEvents::TEvWakeup), duration); + Schedule(new IEventHandle(Sleeper, Sleeper, new TEvents::TEvWakeup), duration); GrabEdgeEventRethrow<TEvents::TEvWakeup>(Sleeper); } @@ -272,11 +272,11 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) { } void SendGetHostByName(const TActorId& sender, const TString& name, int family = AF_UNSPEC) { - Send(new IEventHandleFat(Resolver, sender, new TEvDns::TEvGetHostByName(name, family)), 0, true); + Send(new IEventHandle(Resolver, sender, new TEvDns::TEvGetHostByName(name, family)), 0, true); } void SendGetAddr(const TActorId& sender, const TString& name, int family = AF_UNSPEC) { - Send(new IEventHandleFat(Resolver, sender, new TEvDns::TEvGetAddr(name, family)), 0, true); + Send(new IEventHandle(Resolver, sender, new TEvDns::TEvGetAddr(name, family)), 0, true); } TEvDns::TEvGetHostByNameResult::TPtr WaitGetHostByName(const TActorId& sender) { @@ -387,7 +387,7 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) { runtime.SendGetAddr(sender, "yandex.ru", AF_UNSPEC); runtime.ExpectGetAddrSuccess(sender, "2a02:6b8:a::a"); - runtime.Send(new IEventHandleFat(runtime.MockResolver, { }, new TEvents::TEvPoison), 0, true); + runtime.Send(new IEventHandle(runtime.MockResolver, { }, new TEvents::TEvPoison), 0, true); runtime.SendGetAddr(sender, "foo.ru", AF_UNSPEC); runtime.ExpectGetAddrError(sender, ARES_ENOTINITIALIZED); } @@ -640,7 +640,7 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) { runtime.SendGetHostByName(sender, "yandex.ru", AF_UNSPEC); runtime.SendGetAddr(sender, "yandex.ru", AF_UNSPEC); - runtime.Send(new IEventHandleFat(runtime.Resolver, sender, new TEvents::TEvPoison), 0, true); + runtime.Send(new IEventHandle(runtime.Resolver, sender, new TEvents::TEvPoison), 0, true); runtime.ExpectGetHostByNameError(sender, ARES_ECANCELLED); runtime.ExpectGetAddrError(sender, ARES_ECANCELLED); } diff --git a/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp b/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp index d15fd54192..2758484552 100644 --- a/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp +++ b/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp @@ -13,7 +13,7 @@ Y_UNIT_TEST_SUITE(OnDemandDnsResolver) { runtime.Initialize(); auto sender = runtime.AllocateEdgeActor(); auto resolver = runtime.Register(CreateOnDemandDnsResolver()); - runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)), + runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender); UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText); diff --git a/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp b/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp index 089bd5179f..0c343a805c 100644 --- a/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp +++ b/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp @@ -28,7 +28,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) { runtime.Initialize(); auto sender = runtime.AllocateEdgeActor(); auto resolver = runtime.Register(CreateSimpleDnsResolver()); - runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)), + runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender); UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText); @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) { runtime.Initialize(); auto sender = runtime.AllocateEdgeActor(); auto resolver = runtime.Register(CreateSimpleDnsResolver()); - runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("yandex.ru", AF_UNSPEC)), + runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("yandex.ru", AF_UNSPEC)), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender); UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText); @@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) { auto sender = runtime.AllocateEdgeActor(); auto resolver = runtime.Register(CreateSimpleDnsResolver()); - runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetAddr("yandex.ru", AF_UNSPEC)), + runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetAddr("yandex.ru", AF_UNSPEC)), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetAddrResult>(sender); UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText); @@ -72,7 +72,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) { options.Attempts = 2; options.Servers.emplace_back(TStringBuilder() << "127.0.0.1:" << server.Port); auto resolver = runtime.Register(CreateSimpleDnsResolver(options)); - runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)), + runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender); UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, ARES_ETIMEOUT, ev->Get()->ErrorText); @@ -88,9 +88,9 @@ Y_UNIT_TEST_SUITE(DnsResolver) { options.Attempts = 5; options.Servers.emplace_back(TStringBuilder() << "127.0.0.1:" << server.Port); auto resolver = runtime.Register(CreateSimpleDnsResolver(options)); - runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)), + runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)), 0, true); - runtime.Send(new IEventHandleFat(resolver, sender, new TEvents::TEvPoison), 0, true); + runtime.Send(new IEventHandle(resolver, sender, new TEvents::TEvPoison), 0, true); auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender); UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, ARES_ECANCELLED, ev->Get()->ErrorText); } diff --git a/library/cpp/actors/examples/02_discovery/lookup.cpp b/library/cpp/actors/examples/02_discovery/lookup.cpp index 469fba74fa..979a38d215 100644 --- a/library/cpp/actors/examples/02_discovery/lookup.cpp +++ b/library/cpp/actors/examples/02_discovery/lookup.cpp @@ -14,7 +14,7 @@ class TExampleLookupRequestActor : public TActor<TExampleLookupRequestActor> { void Registered(TActorSystem* sys, const TActorId&) override { const auto flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession; - sys->Send(new IEventHandleFat(Replica, SelfId(), new TEvExample::TEvReplicaLookup(Key), flags)); + sys->Send(new IEventHandle(Replica, SelfId(), new TEvExample::TEvReplicaLookup(Key), flags)); } void PassAway() override { diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp index 4f67f85a3b..104d239481 100644 --- a/library/cpp/actors/helpers/flow_controlled_queue.cpp +++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp @@ -102,7 +102,7 @@ class TFlowControlledRequestQueue : public IActorCallback { Subscribed = true; } - TActivationContext::Send(new IEventHandleFat(Target, reqActorId, IEventHandleFat::GetFat(ev.Get())->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie)); + TActivationContext::Send(new IEventHandle(Target, reqActorId, ev.Get()->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie)); } void PumpQueue() { @@ -123,7 +123,7 @@ class TFlowControlledRequestQueue : public IActorCallback { if (reqActor) { if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) { TActivationContext::Send( - new IEventHandleFat(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) + new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie) ); } reqActor->PassAway(); @@ -136,7 +136,7 @@ class TFlowControlledRequestQueue : public IActorCallback { const auto reason = TEvents::TEvUndelivered::Disconnected; if (ev->Flags & IEventHandle::FlagTrackDelivery) { TActivationContext::Send( - new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie) + new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie) ); } } @@ -175,8 +175,7 @@ public: auto it = Find(RegisteredRequests, reqActor); if (it == RegisteredRequests.end()) return; - IEventHandle::Forward(ev, reqActor->Source); - TActivationContext::Send(ev); + TActivationContext::Send(ev->Forward(reqActor->Source).Release()); const TDuration reqLatency = reqActor->AccumulatedLatency(); if (reqLatency < MinimalSeenLatency) MinimalSeenLatency = reqLatency; diff --git a/library/cpp/actors/helpers/future_callback.h b/library/cpp/actors/helpers/future_callback.h index 4db11c7313..8ca0d99fda 100644 --- a/library/cpp/actors/helpers/future_callback.h +++ b/library/cpp/actors/helpers/future_callback.h @@ -7,7 +7,7 @@ namespace NActors { template <typename EventType> struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> { - using TCallback = std::function<void(TAutoPtr<TEventHandleFat<EventType>>&)>; + using TCallback = std::function<void(TAutoPtr<TEventHandle<EventType>>&)>; using TBase = TActor<TActorFutureCallback<EventType>>; TCallback Callback; @@ -30,29 +30,4 @@ struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> { } }; -template <typename EventType> -struct TActorFutureCallbackLight : TActor<TActorFutureCallbackLight<EventType>> { - using TCallback = std::function<void(TAutoPtr<EventType>&)>; - using TBase = TActor<TActorFutureCallbackLight<EventType>>; - TCallback Callback; - - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::ACTOR_FUTURE_CALLBACK; - } - - TActorFutureCallbackLight(TCallback&& callback) - : TBase(&TActorFutureCallbackLight::StateWaitForEvent) - , Callback(std::move(callback)) - {} - - STRICT_LIGHTFN(StateWaitForEvent, - hFunc(EventType, Handle) - ) - - void Handle(typename EventType::TPtr ev) { - Callback(ev); - TBase::PassAway(); - } -}; - } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index ed4c0972fb..542f817755 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) { const TActorId actorId = runtime->Register(actor); Y_UNUSED(actorId); - //runtime.Send(new IEventHandleFat(actorId, sender, new TEvSelfPing::TEvPing(0.0))); + //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0))); // TODO check after events are handled //Sleep(TDuration::Seconds(1)); diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp index ddc3c67011..fa63783c2d 100644 --- a/library/cpp/actors/http/http_proxy_incoming.cpp +++ b/library/cpp/actors/http/http_proxy_incoming.cpp @@ -59,7 +59,7 @@ public: } TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override { - return new IEventHandleFat(self, parent, new TEvents::TEvBootstrap()); + return new IEventHandle(self, parent, new TEvents::TEvBootstrap()); } void Die(const TActorContext& ctx) override { diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index 7fbcb3f3d6..e06de07867 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -303,22 +303,22 @@ Y_UNIT_TEST_SUITE(HttpProxy) { NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); actorSystem.DispatchEvents(); NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://127.0.0.1:" + ToString(port) + "/test"); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); UNIT_ASSERT_EQUAL(request->Request->URL, "/test"); NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n"); - actorSystem.Send(new NActors::IEventHandleFat(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); + actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); @@ -336,22 +336,22 @@ Y_UNIT_TEST_SUITE(HttpProxy) { NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); actorSystem.DispatchEvents(); NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test"); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); UNIT_ASSERT_EQUAL(request->Request->URL, "/test"); NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n"); - actorSystem.Send(new NActors::IEventHandleFat(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); + actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); @@ -432,22 +432,22 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V add->CertificateFile = certificateFile.Name(); add->PrivateKeyFile = certificateFile.Name(); ///////// - actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), add.Release()), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true); actorSystem.DispatchEvents(); NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test"); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); UNIT_ASSERT_EQUAL(request->Request->URL, "/test"); NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n"); - actorSystem.Send(new NActors::IEventHandleFat(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); + actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); @@ -487,11 +487,11 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V NActors::IActor* proxy = NHttp::CreateHttpProxy(); NActors::TActorId proxyId = actorSystem.Register(proxy); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); actorSystem.DispatchEvents(); NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test"); @@ -499,7 +499,7 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V TString longHeader; longHeader.append(9000, 'X'); httpRequest->Set(longHeader, "data"); - actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); diff --git a/library/cpp/actors/interconnect/event_filter.h b/library/cpp/actors/interconnect/event_filter.h index de3fdc2a04..47dabf5f16 100644 --- a/library/cpp/actors/interconnect/event_filter.h +++ b/library/cpp/actors/interconnect/event_filter.h @@ -31,7 +31,7 @@ namespace NActors { evSpaceIndex[subtype] = routes; } - bool CheckIncomingEvent(const IEventHandleFat& ev, const TScopeId& localScopeId) const { + bool CheckIncomingEvent(const IEventHandle& ev, const TScopeId& localScopeId) const { TRouteMask routes = 0; if (const auto& evSpaceIndex = ScopeRoutes[ev.Type >> 16]) { const ui16 subtype = ev.Type & 65535; diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h index 112a0b1b6e..e2e714a213 100644 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -12,14 +12,14 @@ namespace NActors { TBrokerLeaseHolder(TActorId waiterId, TActorId brokerId) : WaiterId(waiterId) , BrokerId(brokerId) { - if (TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { + if (TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { LeaseRequested = true; } } ~TBrokerLeaseHolder() { if (LeaseRequested) { - TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); + TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); } } diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index ce4adcdec2..312eff2666 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -59,12 +59,10 @@ namespace NActors { TEventHolder& event = Pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; - if (ev.IsEventFat()) { - if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { - event.Span - .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) - .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); - } + if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { + event.Span + .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) + .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); } return std::make_pair(bytes, &event); } diff --git a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp index e917a27ea7..a450d16871 100644 --- a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp +++ b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp @@ -18,7 +18,7 @@ namespace NActors { , Mock(mock) {} - STATEFN(StateFunc) { + STFUNC(StateFunc) { if (ev->GetTypeRewrite() == TEvents::TSystem::Poison && !Proxy) { PassAway(); } else { @@ -32,7 +32,7 @@ namespace NActors { } Y_VERIFY(Proxy); } - InvokeOtherActor(*Proxy, &IActor::Receive, ev); + InvokeOtherActor(*Proxy, &IActor::Receive, ev, ctx); } } }; diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp index 0b0b112628..d638ff830c 100644 --- a/library/cpp/actors/interconnect/interconnect_resolve.cpp +++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp @@ -120,7 +120,7 @@ namespace NActors { LOG_DEBUG_IC("ICR03", "Host: %s, RESOLVED address", Host.c_str()); auto reply = new TEvAddressInfo; reply->Address = std::move(addr); - TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply)); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply)); PassAway(); } @@ -129,7 +129,7 @@ namespace NActors { auto reply = std::make_unique<TEvLocalNodeInfo>(); reply->NodeId = *NodeId; reply->Addresses = std::move(addresses); - TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply.release())); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release())); PassAway(); } @@ -138,7 +138,7 @@ namespace NActors { auto *event = new TEvResolveError; event->Explain = errorText; event->Host = Host; - TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, event)); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, event)); PassAway(); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 122f312fec..32f10c727b 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -334,31 +334,7 @@ namespace NActors { TEventSerializationInfo serializationInfo{ .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), }; - auto es = GetEventSpace(descr.Type); - if (es < TEventFactories::EventFactories.size() && TEventFactories::EventFactories[es] != nullptr) { - const auto& estvec(*TEventFactories::EventFactories[es]); - auto est = GetEventSubType(descr.Type); - if (est < estvec.size() && estvec[est] != nullptr) { - IEventFactory* factory = estvec[est]; - TAutoPtr<IEventHandle> ev = factory->Construct({ - .Session = SessionId, - .Type = descr.Type, - .Flags = descr.Flags, - .Recipient = descr.Recipient, - .Sender = descr.Sender, - .Cookie = descr.Cookie, - .OriginScopeId = Params.PeerScopeId, - .TraceId = std::move(descr.TraceId), - .Data = std::move(data), - }); - if (ev) { - TActivationContext::Send(ev); - } - return; - } - } - - auto ev = std::make_unique<IEventHandleFat>(SessionId, + auto ev = std::make_unique<IEventHandle>(SessionId, descr.Type, descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 8ee93dfc7a..8562f6a440 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -47,7 +47,7 @@ namespace NActors { void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) { if (!DynamicPtr) { // perform usual bootstrap for static nodes - sys->Send(new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); + sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0)); } if (const auto& mon = Common->RegisterMonPage) { TString path = Sprintf("peer%04" PRIu32, PeerNodeId); @@ -591,7 +591,7 @@ namespace NActors { // we have found cancellation request for the pending handshake request; so simply remove it from the // deque, as we are not interested in failure reason; must likely it happens because of handshake timeout if (pendingEvent->GetTypeRewrite() == TEvHandshakeFail::EventType) { - TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(pendingEvent.Release())); + TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(pendingEvent.Release())); LogHandshakeFail(tmp, true); } PendingIncomingHandshakeEvents.erase(it); @@ -605,7 +605,7 @@ namespace NActors { Y_VERIFY(Session && SessionID); ValidateEvent(ev, "ForwardSessionEventToSession"); - InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev); + InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID)); } void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) { @@ -774,7 +774,7 @@ namespace NActors { for (auto& ev : PendingIncomingHandshakeEvents) { Send(ev->Sender, new TEvents::TEvPoisonPill); if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) { - TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(ev.Release())); + TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release())); LogHandshakeFail(tmp, true); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index ebf02c3f27..71edfccbe2 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -63,10 +63,10 @@ namespace NActors { TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr); - STATEFN(StateInit) { + STFUNC(StateInit) { Bootstrap(); if (ev->Type != TEvents::TSystem::Bootstrap) { // for dynamic nodes we do not receive Bootstrap event - Receive(ev); + Receive(ev, ctx); } } @@ -180,7 +180,7 @@ namespace NActors { } else if (DynamicPtr) { PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15); if (!PassAwayScheduled) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(), + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), {}, nullptr, 0)); PassAwayScheduled = true; } @@ -205,7 +205,7 @@ namespace NActors { if (now >= PassAwayTimestamp) { PassAway(); } else if (PassAwayTimestamp != TMonotonic::Max()) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(), + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), {}, nullptr, 0)); } else { PassAwayScheduled = false; @@ -370,21 +370,6 @@ namespace NActors { ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func); } - void ValidateEvent(IEventHandle* ev, const char* func) { - if (SelfId().NodeId() == PeerNodeId) { - TString msg = Sprintf("Event Type# 0x%08" PRIx32 " TypeRewrite# 0x%08" PRIx32 - " from Sender# %s sent to the proxy for the node itself via Interconnect;" - " THIS IS NOT A BUG IN INTERCONNECT, check the event sender instead", - ev->Type, ev->GetTypeRewrite(), ev->Sender.ToString().data()); - LOG_ERROR_IC("ICP03", "%s", msg.data()); - Y_VERIFY_DEBUG(false, "%s", msg.data()); - } - - Y_VERIFY(ev->GetTypeRewrite() != TEvInterconnect::EvForward || ev->Recipient.NodeId() == PeerNodeId, - "Recipient/Proxy NodeId mismatch Recipient# %s Type# 0x%08" PRIx32 " PeerNodeId# %" PRIu32 " Func# %s", - ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func); - } - // Common with helpers // All proxy actors share the same information in the object // read only diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index 316c233af3..aad8677ca4 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -25,7 +25,7 @@ namespace NActors { } TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { - return new IEventHandleFat(self, parentId, new TEvents::TEvBootstrap, 0); + return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); } void TInterconnectListenerTCP::Die(const TActorContext& ctx) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 5a93bc0cc8..a336e4a89f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -112,8 +112,8 @@ namespace NActors { Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); } - void TInterconnectSessionTCP::Forward(TAutoPtr<IEventHandle>& ev) { - Proxy->ValidateEvent(ev.Get(), "Forward"); + void TInterconnectSessionTCP::Forward(STATEFN_SIG) { + Proxy->ValidateEvent(ev, "Forward"); LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data()); ++MessagesGot; @@ -126,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev.Get()); + const auto [dataSize, event] = oChannel.Push(*ev); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; @@ -167,7 +167,7 @@ namespace NActors { } else if (!RamInQueue) { Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); RamInQueue = new TEvRam(true); - auto *ev = new IEventHandleFat(SelfId(), {}, RamInQueue); + auto *ev = new IEventHandle(SelfId(), {}, RamInQueue); const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod; if (batchPeriod != TDuration()) { TActivationContext::Schedule(batchPeriod, ev); @@ -179,7 +179,7 @@ namespace NActors { } } - void TInterconnectSessionTCP::Subscribe(TAutoPtr<IEventHandle>& ev) { + void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); if (inserted) { @@ -190,7 +190,7 @@ namespace NActors { Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie); } - void TInterconnectSessionTCP::Unsubscribe(TEvents::TEvUnsubscribe::TPtr ev) { + void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 9d8bb90ecd..598a5c9220 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -347,16 +347,16 @@ namespace NActors { void Terminate(TDisconnectReason reason); void PassAway() override; - void Forward(LIGHTFN_SIG); - void Subscribe(TAutoPtr<IEventHandle>& ev); - void Unsubscribe(TEvents::TEvUnsubscribe::TPtr); + void Forward(STATEFN_SIG); + void Subscribe(STATEFN_SIG); + void Unsubscribe(STATEFN_SIG); - STRICT_LIGHTFN(StateFunc, + STRICT_STFUNC(StateFunc, fFunc(TEvInterconnect::EvForward, Forward) cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison) fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe) fFunc(TEvents::TEvSubscribe::EventType, Subscribe) - hFunc(TEvents::TEvUnsubscribe, Unsubscribe) + fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe) cFunc(TEvFlush::EventType, HandleFlush) hFunc(TEvPollerReady, Handle) hFunc(TEvPollerRegisterResult, Handle) @@ -531,7 +531,7 @@ namespace NActors { auto sender = SelfId(); const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { auto ev = new TEvSessionBufferSizeRequest(); - return new IEventHandleFat(recp, sender, ev, IEventHandle::FlagTrackDelivery); + return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); }; RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric); Become(&TInterconnectSessionKiller::StateFunc); diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index 65e1d202ff..d460903f35 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -82,7 +82,7 @@ namespace NInterconnect { ) void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { - ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]).Release()); + ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex])); if (++SlaveIndex == Slaves.size()) { SlaveIndex = 0; } diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp index 5619f0f113..0aadc7ae35 100644 --- a/library/cpp/actors/interconnect/mock/ic_mock.cpp +++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp @@ -60,11 +60,11 @@ namespace NActors { TPeerInfo *peer = GetPeer(peerNodeId); auto guard = TReadGuard(peer->Mutex); if (peer->ActorSystem) { - peer->ActorSystem->Send(new IEventHandleFat(peer->ProxyId, TActorId(), new TEvInject(std::move(messages), + peer->ActorSystem->Send(new IEventHandle(peer->ProxyId, TActorId(), new TEvInject(std::move(messages), originScopeId, senderSessionId))); } else { for (auto&& ev : messages) { - TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected)); } } } @@ -78,7 +78,7 @@ namespace NActors { TPeerInfo *peer = GetPeer(peerNodeId); auto guard = TReadGuard(peer->Mutex); if (peer->ActorSystem) { - peer->ActorSystem->Send(new IEventHandleFat(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0)); + peer->ActorSystem->Send(new IEventHandle(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0)); } } @@ -114,7 +114,7 @@ namespace NActors { void Terminate() { for (auto&& ev : std::exchange(Queue, {})) { - TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected)); } for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); @@ -130,7 +130,7 @@ namespace NActors { Subscribe(ev->Sender, ev->Cookie); } if (Queue.empty()) { - TActivationContext::Send(new IEventHandleFat(EvRam, 0, SelfId(), {}, {}, 0)); + TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0)); } Queue.emplace_back(ev.Release()); } @@ -193,7 +193,7 @@ namespace NActors { } template <typename TEvent> - bool CheckNodeStatus(TAutoPtr<TEventHandleFat<TEvent>>& ev) { + bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) { if (PeerNodeStatus != EPeerNodeStatus::EXISTS) { std::unique_ptr<IEventHandle> tmp(ev.Release()); CheckNonexistentNode(tmp); @@ -224,7 +224,7 @@ namespace NActors { if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie); } - TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected)); + TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected)); break; case TEvents::TEvSubscribe::EventType: @@ -252,7 +252,7 @@ namespace NActors { while (!WaitingConnections.empty()) { TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release()); WaitingConnections.pop_front(); - Receive(tmp); + Receive(tmp, TActivationContext::AsActorContext()); } } }; @@ -287,28 +287,20 @@ namespace NActors { return; // drop messages from other sessions } if (auto *session = GetSession()) { - for (auto&& evb : ev->Get()->Messages) { - if (ev->IsEventLight()) { - // TODO(xenoxeno): - //if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { - TActivationContext::Send(evb.release()); - //} - } else { - auto* ev = IEventHandleFat::GetFat(evb); - auto fw = std::make_unique<IEventHandleFat>( - session->SelfId(), - ev->Type, - ev->Flags & ~IEventHandle::FlagForwardOnNondelivery, - ev->Recipient, - ev->Sender, - ev->ReleaseChainBuffer(), - ev->Cookie, - msg->OriginScopeId, - std::move(ev->TraceId) - ); - if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { - TActivationContext::Send(fw.release()); - } + for (auto&& ev : ev->Get()->Messages) { + auto fw = std::make_unique<IEventHandle>( + session->SelfId(), + ev->Type, + ev->Flags & ~IEventHandle::FlagForwardOnNondelivery, + ev->Recipient, + ev->Sender, + ev->ReleaseChainBuffer(), + ev->Cookie, + msg->OriginScopeId, + std::move(ev->TraceId) + ); + if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) { + TActivationContext::Send(fw.release()); } } } @@ -330,7 +322,8 @@ namespace NActors { void HandleSessionEvent(TAutoPtr<IEventHandle> ev) { auto *session = GetSession(); - InvokeOtherActor(*session, &TSessionMockActor::Receive, ev); + InvokeOtherActor(*session, &TSessionMockActor::Receive, ev, + TActivationContext::ActorContextFor(session->SelfId())); } void Disconnect() { @@ -351,7 +344,7 @@ namespace NActors { return State.Inject(PeerNodeId, std::move(messages), Common->LocalScopeId, Session->SessionId); } - STRICT_LIGHTFN(StateFunc, + STRICT_STFUNC(StateFunc, cFunc(TEvents::TSystem::Poison, PassAway) fFunc(TEvInterconnect::EvForward, HandleSessionEvent) fFunc(TEvInterconnect::EvConnectNode, HandleSessionEvent) diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp index 7b0bdf3e17..9ba173e330 100644 --- a/library/cpp/actors/interconnect/packet.cpp +++ b/library/cpp/actors/interconnect/packet.cpp @@ -17,25 +17,14 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { EventActuallySerialized = 0; Descr.Checksum = 0; - if (ev.IsEventLight()) { - if (ev.IsEventSerializable()) { - NActors::IEventHandleLightSerializable& serializable(*NActors::IEventHandleLightSerializable::GetLightSerializable(&ev)); - EventSerializer = serializable.Serializer; - EventSerializedSize = 100; - } else { - EventSerializedSize = 0; - } + if (ev.HasBuffer()) { + Buffer = ev.ReleaseChainBuffer(); + EventSerializedSize = Buffer->GetSize(); + } else if (ev.HasEvent()) { + Event.Reset(ev.ReleaseBase()); + EventSerializedSize = Event->CalculateSerializedSize(); } else { - auto& evFat = *IEventHandleFat::GetFat(&ev); - if (evFat.HasBuffer()) { - Buffer = evFat.ReleaseChainBuffer(); - EventSerializedSize = Buffer->GetSize(); - } else if (evFat.HasEvent()) { - Event.Reset(evFat.ReleaseBase()); - EventSerializedSize = Event->CalculateSerializedSize(); - } else { - EventSerializedSize = 0; - } + EventSerializedSize = 0; } return EventSerializedSize; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index c06e648541..f3c506a663 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -113,7 +113,6 @@ struct TEventHolder : TNonCopyable { TActorId ForwardRecipient; THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; - NActors::TEventSerializer EventSerializer; ui64 Serial; ui32 EventSerializedSize; ui32 EventActuallySerialized; @@ -138,11 +137,10 @@ struct TEventHolder : TNonCopyable { const TActorId& s = d.Sender; const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; Span.EndError("nondelivery"); - TAutoPtr<IEventHandle> ev = Event - ? new IEventHandleFat(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) - : new IEventHandleFat(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); - ev = IEventHandle::ForwardOnNondelivery(ev, NActors::TEvents::TEvUndelivered::Disconnected, unsure); - NActors::TActivationContext::Send(ev); + auto ev = Event + ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) + : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); + NActors::TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), NActors::TEvents::TEvUndelivered::Disconnected, unsure)); } void Clear() { diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index 7161c6ca90..e75cbcaef4 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -104,7 +104,7 @@ namespace NActors { protected: void Notify(TSocketRecord *record, bool read, bool write) { auto issue = [&](const TActorId& recipient) { - ActorSystem->Send(new IEventHandleFat(recipient, {}, new TEvPollerReady(record->Socket, read, write))); + ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write))); }; if (read && record->ReadActorId) { issue(record->ReadActorId); diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 9a541aeb86..b1d2e02f49 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -64,8 +64,6 @@ namespace NActors { using NActors::IEventBase; using NActors::IEventHandle; -using NActors::IEventHandleFat; -using NActors::IEventHandleLight; using NActors::TActorId; using NActors::TConstIoVec; using NActors::TEventSerializedData; diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 561248c3e5..32c8237b59 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandleFat>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp index 78158f07cc..3c474979dc 100644 --- a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp +++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp @@ -106,7 +106,7 @@ void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui3 const TActorId target = MakeResponderServiceId(nodeId); for (ui32 i = 0; i < count; ++i) { const ui32 flags = IEventHandle::FlagTrackDelivery; - as->Send(new IEventHandleFat(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i)); + as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i)); } } diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 000f5d4b3e..3596bffd5a 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -46,30 +46,25 @@ public: } const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); - TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, + TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie)); // Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); ++NextCookie; } } - void HandlePong(TAutoPtr<IEventHandle> e) { + void HandlePong(TAutoPtr<IEventHandle> ev) { // Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl); - if (e->IsEventFat()) { - auto ev = IEventHandleFat::GetFat(e); - if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { - auto& [s2cIt, hash] = it->second; - Y_VERIFY(hash == ev->GetChainBuffer()->GetString()); - SessionToCookie.erase(s2cIt); - InFlight.erase(it); - } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) { - Y_VERIFY(it->second == ev->GetChainBuffer()->GetString()); - Tentative.erase(it); - } else { - Y_FAIL("Cookie# %" PRIu64, ev->Cookie); - } + if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { + auto& [s2cIt, hash] = it->second; + Y_VERIFY(hash == ev->GetChainBuffer()->GetString()); + SessionToCookie.erase(s2cIt); + InFlight.erase(it); + } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) { + Y_VERIFY(it->second == ev->GetChainBuffer()->GetString()); + Tentative.erase(it); } else { - Y_FAIL("Pong is not fat"); + Y_FAIL("Cookie# %" PRIu64, ev->Cookie); } IssueQueries(); } @@ -128,13 +123,10 @@ public: {} void HandlePing(TAutoPtr<IEventHandle>& ev) { - if (ev->IsEventFat()) { - auto* evf = IEventHandleFat::GetFat(ev); - const TString& data = evf->GetChainBuffer()->GetString(); - const TString& response = MD5::CalcRaw(data); - TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Pong, 0, evf->Sender, SelfId(), - MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), evf->Cookie)); - } + const TString& data = ev->GetChainBuffer()->GetString(); + const TString& response = MD5::CalcRaw(data); + TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), + MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie)); } STRICT_STFUNC(StateFunc, diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp index 745a020d2a..23d846a2fd 100644 --- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp +++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp @@ -253,7 +253,7 @@ public: private: void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) { auto ev = new TEvPollerRegister{socket, readActorId, writeActorId}; - ActorSystem_->Send(new IEventHandleFat(PollerId_, TActorId{}, ev)); + ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); } private: diff --git a/library/cpp/actors/testlib/decorator_ut.cpp b/library/cpp/actors/testlib/decorator_ut.cpp index 3c70d25ec0..fe5c769290 100644 --- a/library/cpp/actors/testlib/decorator_ut.cpp +++ b/library/cpp/actors/testlib/decorator_ut.cpp @@ -28,7 +28,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) { virtual ~TDyingChecker() { Write("TDyingChecker::~TDyingChecker"); - TActivationContext::Send(new IEventHandleFat(MasterId, SelfId(), new TEvents::TEvPing())); + TActivationContext::Send(new IEventHandle(MasterId, SelfId(), new TEvents::TEvPing())); } bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { @@ -104,7 +104,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) { return true; } Write("TFizzBuzzToFooBar::DoBeforeSending"); - TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get()); + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); UNIT_ASSERT(handle); TEvWords *event = handle->Get(); TVector<TString> &words = event->Words; @@ -144,7 +144,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) { return true; } Write("TWordEraser::DoBeforeSending"); - TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get()); + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); UNIT_ASSERT(handle); TEvWords *event = handle->Get(); TVector<TString> &words = event->Words; @@ -176,7 +176,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) { return true; } Write("TWithoutWordsDroper::DoBeforeSending"); - TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get()); + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); UNIT_ASSERT(handle); TEvWords *event = handle->Get(); return bool(event->Words); @@ -208,7 +208,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) { } STATEFN(State) { - TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get()); + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); UNIT_ASSERT(handle); UNIT_ASSERT(handle->Sender == MasterId); TEvWords *event = handle->Get(); diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 889edc4969..962cfe81d4 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -48,10 +48,10 @@ namespace NActors { Cerr << ", "; if (ev->HasEvent()) Cerr << " : " << (PRINT_EVENT_BODY ? ev->ToString() : ev->GetTypeName()); - // else if (ev->HasBuffer()) - // Cerr << " : BUFFER"; - // else - // Cerr << " : EMPTY"; + else if (ev->HasBuffer()) + Cerr << " : BUFFER"; + else + Cerr << " : EMPTY"; Cerr << "\n"; } @@ -397,7 +397,7 @@ namespace NActors { TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite()); TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; - recipientActor->Receive(ev); + recipientActor->Receive(ev, ctx); TlsActivationContext = prevTlsActivationContext; // we expect the logger to never die in tests } @@ -1420,9 +1420,8 @@ namespace NActors { } } - void TTestActorRuntimeBase::Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventHandleLight> ev, ui32 senderNodeIndex, bool viaActorSystem) { - ev->PrepareSend(recipient, sender); - Send(ev.Release(), senderNodeIndex, viaActorSystem); + void TTestActorRuntimeBase::Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 senderNodeIndex, bool viaActorSystem) { + Send(new IEventHandle(recipient, sender, ev.Release()), senderNodeIndex, viaActorSystem); } void TTestActorRuntimeBase::Send(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex, bool viaActorSystem) { @@ -1628,7 +1627,7 @@ namespace NActors { TCallstack::GetTlsCallstack() = ev->Callstack; TCallstack::GetTlsCallstack().SetLinesToSkip(); #endif - recipientActor->Receive(ev); + recipientActor->Receive(ev, ctx); node->ExecutorThread->DropUnregistered(); } CurrentRecipient = TActorId(); @@ -1866,8 +1865,7 @@ namespace NActors { if (HasReply) { delete Context->Queue->Pop(); } - IEventHandle::Forward(ev, originalSender); - ctx.ExecutorThread.Send(ev); + ctx.ExecutorThread.Send(IEventHandle::Forward(ev, originalSender)); if (!IsSync && Context->Queue->Head()) { SendHead(ctx); } @@ -1899,17 +1897,9 @@ namespace NActors { TAutoPtr<IEventHandle> GetForwardedEvent() { IEventHandle* ev = Context->Queue->Head(); ReplyChecker->OnRequest(ev); - TAutoPtr<IEventHandle> forwardedEv; - if (ev->IsEventLight()) { - IEventHandleLight* evl = IEventHandleLight::GetLight(ev); - evl->PrepareSend(Delegatee, ReplyId); - forwardedEv = ev; - } else { - IEventHandleFat* evf = IEventHandleFat::GetFat(ev); - forwardedEv = ev->HasEvent() - ? new IEventHandleFat(Delegatee, ReplyId, evf->ReleaseBase().Release(), evf->Flags, evf->Cookie) - : new IEventHandleFat(evf->GetTypeRewrite(), evf->Flags, Delegatee, ReplyId, evf->ReleaseChainBuffer(), evf->Cookie); - } + TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent() + ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie) + : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie); return forwardedEv; } diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index c48708b975..0c1e4207cc 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -260,7 +260,7 @@ namespace NActors { bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions()); bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout); bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline); - void Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventHandleLight> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); + void Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); void Send(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); void SendAsync(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex = 0); void Schedule(TAutoPtr<IEventHandle> ev, const TDuration& duration, ui32 nodeIndex = 0); @@ -371,7 +371,7 @@ namespace NActors { std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; }; GrabEdgeEventIf(handle, truth, simTimeout); if (handle) { - return THolder<TEvent>(IEventHandle::Release<TEvent>(handle)); + return THolder<TEvent>(handle->Release<TEvent>()); } return {}; } diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp index 42aa3a6fbd..dcd458be7c 100644 --- a/library/cpp/actors/wilson/wilson_span.cpp +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -54,7 +54,7 @@ namespace NWilson { void TSpan::Send() { if (TlsActivationContext) { - TActivationContext::Send(new IEventHandleFat(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); + TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); } Data->Sent = true; } diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp index b9b4ee2b21..3e47a8a315 100644 --- a/library/cpp/actors/wilson/wilson_uploader.cpp +++ b/library/cpp/actors/wilson/wilson_uploader.cpp @@ -160,7 +160,7 @@ namespace NWilson { template<typename T> void ScheduleWakeup(T&& deadline) { if (!WakeupScheduled) { - TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::Wakeup, 0, SelfId(), {}, + TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); WakeupScheduled = true; } |