summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core
diff options
context:
space:
mode:
authorxenoxeno <[email protected]>2023-03-09 12:10:01 +0300
committerxenoxeno <[email protected]>2023-03-09 12:10:01 +0300
commitad607bb887619f321dec03b02df8220e01b7f5aa (patch)
tree7d5c87352cbe835b56bb2bdac93b37cbdf8ead21 /library/cpp/actors/core
parent6324d075a5e80b6943b5de6b465b775050fe83df (diff)
light events for actor system
Diffstat (limited to 'library/cpp/actors/core')
-rw-r--r--library/cpp/actors/core/actor.cpp36
-rw-r--r--library/cpp/actors/core/actor.h229
-rw-r--r--library/cpp/actors/core/actor_bootstrapped.h2
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp2
-rw-r--r--library/cpp/actors/core/actor_coroutine.h7
-rw-r--r--library/cpp/actors/core/actor_ut.cpp16
-rw-r--r--library/cpp/actors/core/actor_virtual.h8
-rw-r--r--library/cpp/actors/core/actorsystem.cpp19
-rw-r--r--library/cpp/actors/core/actorsystem.h2
-rw-r--r--library/cpp/actors/core/ask.cpp4
-rw-r--r--library/cpp/actors/core/av_bootstrapped.cpp2
-rw-r--r--library/cpp/actors/core/event.cpp78
-rw-r--r--library/cpp/actors/core/event.h628
-rw-r--r--library/cpp/actors/core/event_load.h2
-rw-r--r--library/cpp/actors/core/event_local.h16
-rw-r--r--library/cpp/actors/core/event_pb.h288
-rw-r--r--library/cpp/actors/core/events_undelivered.cpp71
-rw-r--r--library/cpp/actors/core/executor_thread.cpp10
-rw-r--r--library/cpp/actors/core/io_dispatcher.cpp2
-rw-r--r--library/cpp/actors/core/io_dispatcher.h2
-rw-r--r--library/cpp/actors/core/log.h6
-rw-r--r--library/cpp/actors/core/log_ut.cpp8
-rw-r--r--library/cpp/actors/core/scheduler_actor.cpp2
23 files changed, 1241 insertions, 199 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index 00eef387ea4..304f7405aea 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -16,6 +16,22 @@ namespace NActors {
return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId));
}
+ bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept {
+ return SelfActorId.Send(recipient, ev);
+ }
+
+ bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept {
+ return SelfActorId.Send(recipient, ev, flags);
+ }
+
+ bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept {
+ return SelfActorId.Send(recipient, ev, flags, cookie);
+ }
+
+ bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept {
+ return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId));
+ }
+
void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
}
@@ -29,15 +45,15 @@ namespace NActors {
}
void TActorIdentity::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie);
+ return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie);
}
void TActorIdentity::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie);
+ return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie);
}
void TActorIdentity::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const {
- return TActivationContext::Schedule(delta, new IEventHandle(*this, {}, ev), cookie);
+ return TActivationContext::Schedule(delta, new IEventHandleFat(*this, {}, ev), cookie);
}
TActorId TActivationContext::RegisterWithSameMailbox(IActor* actor, TActorId parentId) {
@@ -75,27 +91,27 @@ namespace NActors {
}
void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
+ ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie);
}
void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
+ ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie);
}
void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie);
+ ExecutorThread.Schedule(delta, new IEventHandleFat(SelfID, TActorId(), ev), cookie);
}
void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
+ TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie);
}
void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
+ TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie);
}
void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
+ TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie);
}
TInstant TActivationContext::Now() {
@@ -145,7 +161,7 @@ namespace NActors {
(actor->*StateFunc)(ev, TActivationContext::AsActorContext());
}
- void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandle> ev) {
+ void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev) {
Y_VERIFY(!!ev && ev->GetBase());
ev->GetBase()->Execute(actor, std::move(ev));
}
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 0e109da8192..08a3beb2bfc 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -59,6 +59,12 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
static bool Send(std::unique_ptr<IEventHandle> &&ev);
+ template <ESendingType SendingType = ESendingType::Common>
+ static bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient);
+
+ template <ESendingType SendingType = ESendingType::Common>
+ static bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient);
+
/**
* Schedule one-shot event that will be send at given time point in the future.
*
@@ -132,10 +138,16 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
template <ESendingType SendingType = ESendingType::Common>
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
+ template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
template <ESendingType SendingType = ESendingType::Common>
+ bool Send(const TActorId& recipient, THolder<IEventHandleLight> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
+ return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId));
+ }
+ template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId));
}
@@ -145,6 +157,15 @@ namespace NActors {
bool Send(std::unique_ptr<IEventHandle> &&ev) const {
return Send<SendingType>(TAutoPtr<IEventHandle>(ev.release()));
}
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const;
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const;
+ template <ESendingType SendingType = ESendingType::Common, typename TEventHandle>
+ bool Forward(TAutoPtr<TEventHandle>& ev, const TActorId& recipient) const {
+ TAutoPtr<IEventHandle> evi(ev.Release());
+ return Forward(evi, recipient);
+ }
TInstant Now() const;
TMonotonic Monotonic() const;
@@ -206,6 +227,15 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Send(const TActorId& recipient, IEventHandleLight* ev) const;
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const;
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const;
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const;
+ bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
@@ -253,7 +283,7 @@ namespace NActors {
class TActorVirtualBehaviour {
public:
- static void Receive(IActor* actor, std::unique_ptr<IEventHandle> ev);
+ static void Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev);
public:
};
@@ -262,8 +292,7 @@ namespace NActors {
using TBase = IActor;
friend class TDecorator;
public:
- typedef void (IActor::* TReceiveFunc)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
-
+ using TReceiveFunc = void (IActor::*)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
private:
TReceiveFunc StateFunc = nullptr;
public:
@@ -303,6 +332,8 @@ namespace NActors {
friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
friend class TDecorator;
protected:
+ using TReceiveFuncLight = void (IActor::*)(TAutoPtr<IEventHandle>& ev);
+ TReceiveFuncLight StateFuncLight = nullptr;
TActorCallbackBehaviour CImpl;
public:
using TReceiveFunc = TActorCallbackBehaviour::TReceiveFunc;
@@ -476,19 +507,28 @@ namespace NActors {
TActorIdentity SelfId() const {
return SelfActorId;
}
- void Receive(TAutoPtr<IEventHandle>& ev, const TActorContext& /*ctx*/) {
+ // void Receive(TAutoPtr<IEventHandle> ev, const TActorContext& /*ctx*/) {
+ // Receive(ev);
+ // }
+ void Receive(TAutoPtr<IEventHandle>& ev) {
++HandledEvents;
- if (CImpl.Initialized()) {
+ if (StateFuncLight) {
+ (this->*StateFuncLight)(ev);
+ } else if (CImpl.Initialized()) {
CImpl.Receive(this, ev);
} else {
- TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandle>(ev.Release()));
+ TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandleFat>(IEventHandleFat::MakeFat(ev).Release()));
}
}
protected:
void Describe(IOutputStream&) const noexcept override;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
- bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
+ bool Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept;
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept;
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept;
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept;
+ bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
@@ -511,6 +551,20 @@ namespace NActors {
return Send(recipient, ev.release(), flags, cookie, std::move(traceId));
}
+ bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
+ return TActivationContext::Forward(ev, recipient);
+ }
+
+ bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
+ return TActivationContext::Forward(ev, recipient);
+ }
+
+ template <typename TEventHandle>
+ bool Forward(TAutoPtr<TEventHandle>& ev, const TActorId& recipient) const {
+ TAutoPtr<IEventHandle> evi(ev.Release());
+ return Forward(evi, recipient);
+ }
+
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
@@ -574,8 +628,78 @@ namespace NActors {
}
};
+ template<typename TDerived>
+ class TActorCallback : public IActorCallback {
+ public:
+ using TFatReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&, const TActorContext&);
+ using TLightReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&);
+
+ private:
+ void BecomeFat(TFatReceiveFunc stateFunc) {
+ IActorCallback::Become(stateFunc);
+ }
+
+ template<typename... TArgs>
+ void BecomeFat(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) {
+ IActorCallback::Become(stateFunc, ctx, std::forward<TArgs>(args)...);
+ }
+
+ template<typename... TArgs>
+ void BecomeFat(TFatReceiveFunc stateFunc, TArgs&&... args) {
+ IActorCallback::Become(stateFunc, std::forward<TArgs>(args)...);
+ }
+
+ void BecomeLight(TLightReceiveFunc stateFuncLight) {
+ StateFuncLight = static_cast<TReceiveFuncLight>(stateFuncLight);
+ }
+
+ public:
+ TActorCallback(TFatReceiveFunc stateFunc, ui32 activityType = OTHER)
+ : IActorCallback(static_cast<TReceiveFunc>(stateFunc), activityType)
+ {
+ }
+
+ TActorCallback(TLightReceiveFunc stateFunc, ui32 activityType = OTHER)
+ : IActorCallback(nullptr, activityType)
+ {
+ Become(stateFunc);
+ }
+
+ template<typename T>
+ void Become(T stateFunc) {
+ IActorCallback::Become(stateFunc);
+ }
+
+ template<typename T, typename... TArgs>
+ void Become(T stateFunc, const TActorContext&, TArgs&&... args) {
+ IActorCallback::Become(stateFunc);
+ Schedule(std::forward<TArgs>(args)...);
+ }
+
+ template<typename T, typename... TArgs>
+ void Become(T stateFunc, TArgs&&... args) {
+ IActorCallback::Become(stateFunc);
+ Schedule(std::forward<TArgs>(args)...);
+ }
+
+ template<>
+ void Become<TLightReceiveFunc>(TLightReceiveFunc stateFunc) {
+ BecomeLight(stateFunc);
+ }
+
+ template<>
+ void Become<TFatReceiveFunc>(TFatReceiveFunc stateFunc) {
+ BecomeFat(stateFunc);
+ }
+
+ template<typename... TArgs>
+ void Become(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) {
+ BecomeFat(stateFunc, ctx, std::forward<TArgs>(args)...);
+ }
+ };
+
template <typename TDerived>
- class TActor: public IActorCallback {
+ class TActor: public TActorCallback<TDerived> {
private:
template <typename T, typename = const char*>
struct HasActorName: std::false_type { };
@@ -601,14 +725,20 @@ namespace NActors {
protected:
//* Comment this function to find unmarked activities
static constexpr IActor::EActivityType ActorActivityType() {
- return EActorActivity::OTHER;
+ return IActorCallback::EActorActivity::OTHER;
} //*/
// static constexpr char ActorName[] = "UNNAMED";
- TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx), ui32 activityType = GetActivityTypeIndex())
- : IActorCallback(static_cast<TReceiveFunc>(func), activityType)
- { }
+ TActor(typename TActorCallback<TDerived>::TFatReceiveFunc func, ui32 activityType = GetActivityTypeIndex())
+ : TActorCallback<TDerived>(func, activityType)
+ {
+ }
+
+ TActor(typename TActorCallback<TDerived>::TLightReceiveFunc func, ui32 activityType = GetActivityTypeIndex())
+ : TActorCallback<TDerived>(func, activityType)
+ {
+ }
public:
typedef TDerived TThis;
@@ -617,8 +747,10 @@ namespace NActors {
#define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx
#define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
+#define LIGHTFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
#define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx)
#define STATEFN(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& )
+#define LIGHTFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
#define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype);
@@ -630,13 +762,26 @@ namespace NActors {
UNHANDLED_MSG_HANDLER \
}
+#define LIGHTFN_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \
+ switch (const ui32 etype = ev->GetTypeRewrite()) { \
+ HANDLERS \
+ default: \
+ UNHANDLED_MSG_HANDLER \
+ }
+
#define STRICT_STFUNC_BODY(HANDLERS) STFUNC_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER)
+#define STRICT_LIGHTFN_BODY(HANDLERS) LIGHTFN_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER)
#define STRICT_STFUNC(NAME, HANDLERS) \
void NAME(STFUNC_SIG) { \
STRICT_STFUNC_BODY(HANDLERS) \
}
+#define STRICT_LIGHTFN(NAME, HANDLERS) \
+ void NAME(LIGHTFN_SIG) { \
+ STRICT_LIGHTFN_BODY(HANDLERS) \
+ }
+
#define STRICT_STFUNC_EXC(NAME, HANDLERS, EXCEPTION_HANDLERS) \
void NAME(STFUNC_SIG) { \
try { \
@@ -681,7 +826,7 @@ namespace NActors {
STFUNC(State) {
if (DoBeforeReceiving(ev, ctx)) {
- Actor->Receive(ev, ctx);
+ Actor->Receive(ev);
DoAfterReceiving(ctx);
}
}
@@ -777,8 +922,26 @@ namespace NActors {
}
template <ESendingType SendingType>
+ bool TActivationContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
+ IEventHandle::Forward(ev, recipient);
+ return Send(ev);
+ }
+
+ template <ESendingType SendingType>
+ bool TActivationContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
+ IEventHandle::Forward(ev, recipient);
+ return Send(ev);
+ }
+
+ template <ESendingType SendingType>
bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
+ return Send<SendingType>(new IEventHandleFat(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
+ }
+
+ template <ESendingType SendingType>
+ bool TActorContext::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
+ ev->PrepareSend(recipient, SelfID, flags, cookie, std::move(traceId));
+ return Send<SendingType>(ev);
}
template <ESendingType SendingType>
@@ -787,6 +950,18 @@ namespace NActors {
}
template <ESendingType SendingType>
+ bool TActorContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const {
+ IEventHandle::Forward(ev, recipient);
+ return ExecutorThread.Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
+ bool TActorContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const {
+ IEventHandle::Forward(ev, recipient);
+ return ExecutorThread.Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, parentId);
}
@@ -798,7 +973,31 @@ namespace NActors {
template <ESendingType SendingType>
bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
+ return TActivationContext::Send<SendingType>(new IEventHandleFat(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
+ }
+
+ template <ESendingType SendingType>
+ bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev) const {
+ ev->PrepareSend(recipient, *this);
+ return TActivationContext::Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
+ bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const {
+ ev->PrepareSend(recipient, *this, flags);
+ return TActivationContext::Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
+ bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const {
+ ev->PrepareSend(recipient, *this, flags, cookie);
+ return TActivationContext::Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
+ bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
+ ev->PrepareSend(recipient, *this, flags, cookie, std::move(traceId));
+ return TActivationContext::Send<SendingType>(ev);
}
template <ESendingType SendingType>
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h
index 9d89afcf709..37286d48e28 100644
--- a/library/cpp/actors/core/actor_bootstrapped.h
+++ b/library/cpp/actors/core/actor_bootstrapped.h
@@ -12,7 +12,7 @@ namespace NActors {
class TActorBootstrapped: public TActor<TDerived> {
protected:
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override {
- return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0);
+ return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0);
}
STFUNC(StateBootstrap) {
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index 48c6f4d2d0a..6b55228bd75 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -44,7 +44,7 @@ namespace NActors {
THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TInstant deadline) {
const ui64 cookie = ++WaitCookie;
if (deadline != TInstant::Max()) {
- TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie));
+ TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie));
}
// ensure we have no unprocessed event and return back to actor system to receive one
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index fc9ddeef1f8..9fca8598f18 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -115,6 +115,11 @@ namespace NActors {
bool Send(TAutoPtr<IEventHandle> ev);
+ bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
+ IEventHandle::Forward(ev, recipient);
+ return Send(ev.Release());
+ }
+
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) {
return GetActorContext().Schedule(delta, ev, cookie);
}
@@ -157,7 +162,7 @@ namespace NActors {
{}
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override {
- return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0);
+ return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0);
}
private:
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index 86b19af546e..fe492e531e3 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
ctx.RegisterWithSameMailbox(new TDummyActor());
}
if (Role == ERole::Leader) {
- TAutoPtr<IEventHandle> ev = new IEventHandle(Receiver, SelfId(), new TEvents::TEvPing());
+ TAutoPtr<IEventHandle> ev = new IEventHandleFat(Receiver, SelfId(), new TEvents::TEvPing());
SpecialSend(ev, ctx);
}
}
@@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
if (AllocatesMemory) {
- SpecialSend(new IEventHandle(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx);
+ SpecialSend(new IEventHandleFat(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx);
} else {
std::swap(*const_cast<TActorId*>(&ev->Sender), *const_cast<TActorId*>(&ev->Recipient));
ev->DropRewrite();
@@ -541,14 +541,14 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
{
}
- bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) override {
+ bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
*Counter += 1;
if (ev->Type != TEvents::THelloWorld::Pong) {
- TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing());
+ TAutoPtr<IEventHandle> pingEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPing());
SavedEvent = ev;
- Actor->Receive(pingEv, ctx);
+ Actor->Receive(pingEv);
} else {
- Actor->Receive(SavedEvent, ctx);
+ Actor->Receive(SavedEvent);
}
return false;
}
@@ -566,7 +566,7 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
*Counter += 1;
if (ev->Type == TEvents::THelloWorld::Ping) {
- TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong());
+ TAutoPtr<IEventHandle> pongEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPong());
Send(SelfId(), new TEvents::TEvPong());
return false;
}
@@ -701,7 +701,7 @@ Y_UNIT_TEST_SUITE(TestStateFunc) {
auto sender = runtime.AllocateEdgeActor();
auto testActor = runtime.Register(new TTestActorWithExceptionsStateFunc());
for (ui64 tag = 0; tag < 4; ++tag) {
- runtime.Send(new IEventHandle(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true);
+ runtime.Send(new IEventHandleFat(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Tag, tag);
}
diff --git a/library/cpp/actors/core/actor_virtual.h b/library/cpp/actors/core/actor_virtual.h
index c9c34c4729d..1306252b58d 100644
--- a/library/cpp/actors/core/actor_virtual.h
+++ b/library/cpp/actors/core/actor_virtual.h
@@ -8,7 +8,7 @@ template <class TEvent>
class TEventContext {
private:
TEvent* Event;
- std::unique_ptr<IEventHandle> Handle;
+ std::unique_ptr<IEventHandleFat> Handle;
public:
const TEvent* operator->() const {
return Event;
@@ -16,7 +16,7 @@ public:
const IEventHandle& GetHandle() const {
return *Handle;
}
- TEventContext(std::unique_ptr<IEventHandle> handle)
+ TEventContext(std::unique_ptr<IEventHandleFat> handle)
: Handle(std::move(handle))
{
Y_VERIFY_DEBUG(dynamic_cast<TEvent*>(Handle->GetBase()));
@@ -28,7 +28,7 @@ public:
template <class TEvent, class TExpectedActor>
class IEventForActor: public IEventBase {
protected:
- virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override {
+ virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override {
Y_VERIFY_DEBUG(dynamic_cast<TExpectedActor*>(actor));
auto* actorCorrect = static_cast<TExpectedActor*>(actor);
TEventContext<TEvent> context(std::move(eventPtr));
@@ -41,7 +41,7 @@ public:
template <class TBaseEvent, class TEvent, class TExpectedObject>
class IEventForAnything: public TBaseEvent {
protected:
- virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override {
+ virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override {
auto* objImpl = dynamic_cast<TExpectedObject*>(actor);
if (!objImpl) {
return false;
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index 1ba4852f538..b33861a9668 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -74,7 +74,7 @@ namespace NActors {
if (recpNodeId != NodeId && recpNodeId != 0) {
// if recipient is not local one - rewrite with forward instruction
- Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable());
+ //Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable());
Y_VERIFY(ev->Recipient == recipient,
"Event rewrite from %s to %s would be lost via interconnect",
ev->Recipient.ToString().c_str(),
@@ -96,7 +96,7 @@ namespace NActors {
}
if (target != actorId) {
// a race has occured, terminate newly created actor
- Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
+ Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
}
}
recipient = target;
@@ -110,8 +110,9 @@ namespace NActors {
return true;
}
}
-
- Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown));
+ if (ev) {
+ Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown));
+ }
return false;
}
@@ -121,7 +122,7 @@ namespace NActors {
bool TActorSystem::GenericSend<&IExecutorPool::SpecificSend>(TAutoPtr<IEventHandle> ev) const;
bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie) const {
- return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie));
+ return this->Send(new IEventHandleFat(recipient, DefSelfID, ev, flags, cookie));
}
bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev) const {
@@ -139,6 +140,14 @@ namespace NActors {
}
}
+ bool TActorSystem::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const {
+ return this->Send(ev->PrepareSend(recipient, DefSelfID, flags, cookie));
+ }
+
+ bool TActorSystem::Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags, ui64 cookie) const {
+ return this->Send(ev->PrepareSend(recipient, sender, flags, cookie));
+ }
+
void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
Schedule(deadline - Timestamp(), ev, cookie);
}
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 7002a9f44e8..b506a1e3601 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -204,6 +204,8 @@ namespace NActors {
bool SpecificSend(TAutoPtr<IEventHandle> ev) const;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const;
+ bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const;
+ bool Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const;
/**
* Schedule one-shot event that will be send at given time point in the future.
diff --git a/library/cpp/actors/core/ask.cpp b/library/cpp/actors/core/ask.cpp
index 821c7606d4d..e3fe6d9ee3f 100644
--- a/library/cpp/actors/core/ask.cpp
+++ b/library/cpp/actors/core/ask.cpp
@@ -47,9 +47,9 @@ namespace NActors {
if (ev->GetTypeRewrite() == TTimeout::EventType) {
Promise_.SetException(std::make_exception_ptr(yexception() << "ask timeout"));
} else if (!ExpectedEventType_ || ev->GetTypeRewrite() == ExpectedEventType_) {
- Promise_.SetValue(ev->ReleaseBase());
+ Promise_.SetValue(IEventHandleFat::GetFat(ev.Get())->ReleaseBase());
} else {
- Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << ev->GetBase()->ToString()));
+ Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << IEventHandleFat::GetFat(ev.Get())->GetBase()->ToString()));
}
PassAway();
diff --git a/library/cpp/actors/core/av_bootstrapped.cpp b/library/cpp/actors/core/av_bootstrapped.cpp
index 771177242ec..e1123027926 100644
--- a/library/cpp/actors/core/av_bootstrapped.cpp
+++ b/library/cpp/actors/core/av_bootstrapped.cpp
@@ -7,7 +7,7 @@ public:
};
TAutoPtr<NActors::IEventHandle> TActorAutoStart::AfterRegister(const TActorId& self, const TActorId& parentId) {
- return new IEventHandle(self, parentId, new TEventForStart, 0);
+ return new IEventHandleFat(self, parentId, new TEventForStart, 0);
}
void TActorAutoStart::ProcessEvent(TEventContext<TEventForStart>& ev) {
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp
index 6a5230f825e..b963dfb4ea0 100644
--- a/library/cpp/actors/core/event.cpp
+++ b/library/cpp/actors/core/event.cpp
@@ -7,7 +7,79 @@ namespace NActors {
Max<ui64>(), Max<ui64>()
};
- TIntrusivePtr<TEventSerializedData> IEventHandle::ReleaseChainBuffer() {
+ TAutoPtr<IEventHandle>& IEventHandle::Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient) {
+ if (ev->IsEventLight()) {
+ IEventHandleLight::GetLight(ev.Get())->Forward(recipient);
+ } else {
+ ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient);
+ }
+ return ev;
+ }
+
+ THolder<IEventHandle>& IEventHandle::Forward(THolder<IEventHandle>& ev, TActorId recipient) {
+ if (ev->IsEventLight()) {
+ IEventHandleLight::GetLight(ev.Get())->Forward(recipient);
+ } else {
+ ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient);
+ }
+ return ev;
+ }
+
+ TString IEventHandle::GetTypeName() const {
+ if (IsEventFat()) {
+ auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this));
+ return ev->HasEvent() ? TypeName(*(ev->GetBase())) : TypeName(*this);
+ } else {
+ return TypeName(*this);
+ }
+ }
+
+ TString IEventHandle::ToString() const {
+ if (IsEventFat()) {
+ auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this));
+ return ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?";
+ } else {
+ // TODO(xenoxeno):
+ return TypeName(*this);
+ }
+ }
+
+ bool IEventHandle::HasEvent() const {
+ if (IsEventLight()) {
+ return true;
+ } else {
+ return IEventHandleFat::GetFat(this)->HasEvent();
+ }
+ }
+
+ bool IEventHandle::HasBuffer() const {
+ if (IsEventLight()) {
+ return false;
+ } else {
+ return IEventHandleFat::GetFat(this)->HasBuffer();
+ }
+ }
+
+ TActorId IEventHandle::GetForwardOnNondeliveryRecipient() const {
+ if (IsEventLight()) {
+ return {};
+ } else {
+ return IEventHandleFat::GetFat(this)->GetForwardOnNondeliveryRecipient();
+ }
+ }
+
+ size_t IEventHandle::GetSize() const {
+ if (IsEventLight()) {
+ if (IsEventSerializable()) {
+ return IEventHandleLightSerializable::GetLightSerializable(this)->GetSize();
+ }
+ } else {
+ return IEventHandleFat::GetFat(this)->GetSize();
+ }
+ return 0;
+ }
+
+ TIntrusivePtr<TEventSerializedData> IEventHandleFat::ReleaseChainBuffer() {
if (Buffer) {
TIntrusivePtr<TEventSerializedData> result;
DoSwap(result, Buffer);
@@ -24,7 +96,7 @@ namespace NActors {
return new TEventSerializedData;
}
- TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() {
+ TIntrusivePtr<TEventSerializedData> IEventHandleFat::GetChainBuffer() {
if (Buffer) {
return Buffer;
}
@@ -36,4 +108,6 @@ namespace NActors {
}
return new TEventSerializedData;
}
+
+ std::vector<std::vector<IEventFactory*>*> TEventFactories::EventFactories;
}
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 4eedeb0574a..28e2c95e000 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -9,6 +9,7 @@
#include <util/system/hp_timer.h>
#include <util/generic/maybe.h>
+#include <util/string/builder.h>
namespace NActors {
class TChunkSerializer;
@@ -23,7 +24,7 @@ namespace NActors {
public ISerializerToStream {
protected:
// for compatibility with virtual actors
- virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandle> /*eventPtr*/) {
+ virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandleFat> /*eventPtr*/) {
Y_VERIFY_DEBUG(false);
return false;
}
@@ -33,7 +34,7 @@ namespace NActors {
virtual ~IEventBase() {
}
- bool Execute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) {
+ bool Execute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) {
return DoExecute(actor, std::move(eventPtr));
}
@@ -53,8 +54,70 @@ namespace NActors {
virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
};
- // fat handle
- class IEventHandle : TNonCopyable {
+ struct IEventHandleFields {
+ enum EFlags : ui32 {
+ FlagTrackDelivery = 1 << 0,
+ FlagForwardOnNondelivery = 1 << 1,
+ FlagSubscribeOnSession = 1 << 2,
+ FlagUseSubChannel = 1 << 3,
+ FlagGenerateUnsureUndelivered = 1 << 4,
+ FlagExtendedFormat = 1 << 5,
+ FlagLight = 1 << 18,
+ FlagSerializable = 1 << 19,
+ };
+
+ static constexpr ui32 STICKY_FLAGS = (FlagLight | FlagSerializable);
+
+ TActorId Recipient = {};
+ TActorId Sender = {};
+ ui32 Type = {};
+
+#pragma pack(push, 1)
+ union {
+ ui32 Flags = {};
+ struct {
+ ui32 NormalFlags:6;
+ ui32 ReservedFlags:12;
+ ui32 StickyFlags:2;
+ ui32 Channel:12;
+ };
+ struct {
+ // flags
+ ui32 TrackDelivery:1;
+ ui32 ForwardOnNondeliveryFlag:1;
+ ui32 SubscribeOnSession:1;
+ ui32 UseSubChannel:1;
+ ui32 GenerateUnsureUndelivered:1;
+ ui32 ExtendedFormat:1;
+ // reserved
+ ui32 Reserved:12;
+ // sticky flags
+ ui32 Light:1;
+ ui32 Serializable:1;
+ };
+ };
+#pragma pack(pop)
+
+ ui64 Cookie = {};
+
+ TActorId RewriteRecipient = {};
+ ui32 RewriteType = {};
+
+ NWilson::TTraceId TraceId = {};
+
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ ::NHPTimer::STime SendTime = 0;
+#endif
+ };
+
+ class IEventHandle : public IEventHandleFields {
+ public:
+ IEventHandle(IEventHandleFields&& fields = {})
+ : IEventHandleFields(std::move(fields))
+ {}
+
+ virtual ~IEventHandle() = default;
+
struct TOnNondelivery {
TActorId Recipient;
@@ -64,28 +127,142 @@ namespace NActors {
}
};
+ THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
+
+ ui16 GetChannel() const noexcept {
+ return Channel;
+ }
+
+ ui64 GetSubChannel() const noexcept {
+ return UseSubChannel ? Sender.LocalId() : 0ULL;
+ }
+
+ // deprecate(xenoxeno) ?
+ static const size_t ChannelBits = 12;
+ static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
+
+ static ui32 MakeFlags(ui32 channel, ui32 flags) {
+ Y_VERIFY(channel < (1 << ChannelBits));
+ Y_VERIFY(flags < (1 << ChannelShift));
+ return (flags | (channel << ChannelShift));
+ }
+ //
+
+ void SetFlags(ui32 flags) {
+ Flags = (flags & ~STICKY_FLAGS) | (Flags & STICKY_FLAGS);
+ }
+
+ const TActorId& GetRecipientRewrite() const {
+ return RewriteRecipient;
+ }
+
+ void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
+ RewriteRecipient = recipientRewrite;
+ RewriteType = typeRewrite;
+ }
+
+ void DropRewrite() {
+ RewriteRecipient = Recipient;
+ RewriteType = Type;
+ }
+
+ ui32 GetTypeRewrite() const {
+ return RewriteType;
+ }
+
+ bool IsEventLight() const {
+ return Light;
+ }
+
+ bool IsEventFat() const {
+ return !Light;
+ }
+
+ bool IsEventSerializable() const {
+ return Serializable;
+ }
+
+ template<typename TEventType>
+ TEventType* Get();
+ template<typename TEventType>
+ TEventType* CastAsLocal();
+ template<typename TEventType>
+ TEventType* StaticCastAsLocal();
+ bool HasEvent() const;
+ bool HasBuffer() const;
+ TString GetTypeName() const;
+ TString ToString() const;
+ size_t GetSize() const;
+ static TAutoPtr<IEventHandle>& Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient);
+ static THolder<IEventHandle>& Forward(THolder<IEventHandle>& ev, TActorId recipient);
+ static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure = false);
+ static TAutoPtr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure = false);
+ template<typename TEventType>
+ static TEventType* Release(TAutoPtr<IEventHandle>&);
+ template<typename TEventType>
+ static TEventType* Release(THolder<IEventHandle>&);
+ template<typename TEventType>
+ static TEventType* Release(std::unique_ptr<IEventHandle>&);
+
+ template<typename TEventTypeSmartPtr>
+ static TAutoPtr<IEventHandle> ForwardOnNondelivery(TEventTypeSmartPtr& ev, ui32 reason, bool unsure = false) {
+ TAutoPtr<IEventHandle> evi(ev.Release());
+ return ForwardOnNondelivery(evi, reason, unsure);
+ }
+
+ TActorId GetForwardOnNondeliveryRecipient() const;
+ };
+
+ // fat handle
+ class IEventHandleFat : public IEventHandle, TNonCopyable {
public:
template <typename TEv>
- inline TEv* CastAsLocal() const noexcept {
+ inline TEv* CastAsLocal() const noexcept { // cast with event check
auto fits = GetTypeRewrite() == TEv::EventType;
+ constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());};
+ if constexpr (couldBeCasted) {
+ return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
+ } else {
+ Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data());
+ }
+ }
- return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
+ template <typename TEv>
+ inline TEv* StaticCastAsLocal() const noexcept { // blind cast
+ constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());};
+ if constexpr (couldBeCasted) {
+ return static_cast<TEv*>(Event.Get());
+ } else {
+ Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data());
+ }
}
template <typename TEventType>
TEventType* Get() {
if (Type != TEventType::EventType)
- Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
+ Y_FAIL("%s", (TStringBuilder()
+ << "Event type " << Type
+ << " doesn't match the expected type " << TEventType::EventType
+ << " requested typename " << TypeName<TEventType>()
+ << " actual typename " << GetTypeName()).data());
- if (!Event) {
+ constexpr bool couldBeGot = requires() {
Event.Reset(TEventType::Load(Buffer.Get()));
+ static_cast<TEventType*>(Event.Get());
+ };
+ if constexpr (couldBeGot) {
+ if (!Event) {
+ Event.Reset(TEventType::Load(Buffer.Get()));
+ }
+
+ if (Event) {
+ return static_cast<TEventType*>(Event.Get());
+ }
+
+ Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
+ } else {
+ Y_FAIL("Event type %" PRIu32 " couldn't be get as type %s", Type, TypeName<TEventType>().data());
}
-
- if (Event) {
- return static_cast<TEventType*>(Event.Get());
- }
-
- Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
}
template <typename T>
@@ -96,104 +273,43 @@ namespace NActors {
return x;
}
- enum EFlags {
- FlagTrackDelivery = 1 << 0,
- FlagForwardOnNondelivery = 1 << 1,
- FlagSubscribeOnSession = 1 << 2,
- FlagUseSubChannel = 1 << 3,
- FlagGenerateUnsureUndelivered = 1 << 4,
- FlagExtendedFormat = 1 << 5,
- };
-
- const ui32 Type;
- const ui32 Flags;
- const TActorId Recipient;
- const TActorId Sender;
- const ui64 Cookie;
const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect
- // if set, used by ActorSystem/Interconnect to report tracepoints
- NWilson::TTraceId TraceId;
-
// filled if feeded by interconnect session
const TActorId InterconnectSession;
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- ::NHPTimer::STime SendTime;
-#endif
-
- static const size_t ChannelBits = 12;
- static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
#ifdef USE_ACTOR_CALLSTACK
TCallstack Callstack;
#endif
- ui16 GetChannel() const noexcept {
- return Flags >> ChannelShift;
- }
-
- ui64 GetSubChannel() const noexcept {
- return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL;
- }
-
- static ui32 MakeFlags(ui32 channel, ui32 flags) {
- Y_VERIFY(channel < (1 << ChannelBits));
- Y_VERIFY(flags < (1 << ChannelShift));
- return (flags | (channel << ChannelShift));
- }
-
private:
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
- TActorId RewriteRecipient;
- ui32 RewriteType;
-
- THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
-
public:
- void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
- RewriteRecipient = recipientRewrite;
- RewriteType = typeRewrite;
- }
-
- void DropRewrite() {
- RewriteRecipient = Recipient;
- RewriteType = Type;
- }
-
- const TActorId& GetRecipientRewrite() const {
- return RewriteRecipient;
- }
-
- ui32 GetTypeRewrite() const {
- return RewriteType;
- }
-
TActorId GetForwardOnNondeliveryRecipient() const {
return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
}
- IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
+ IEventHandleFat(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
- : Type(ev->Type())
- , Flags(flags)
- , Recipient(recipient)
- , Sender(sender)
- , Cookie(cookie)
- , TraceId(std::move(traceId))
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- , SendTime(0)
-#endif
+ : IEventHandle({
+ .Recipient = recipient,
+ .Sender = sender,
+ .Type = ev->Type(),
+ .Flags = flags,
+ .Cookie = cookie,
+ .RewriteRecipient = recipient,
+ .RewriteType = ev->Type(),
+ .TraceId = std::move(traceId),
+ })
, Event(ev)
- , RewriteRecipient(Recipient)
- , RewriteType(Type)
{
if (forwardOnNondelivery)
OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
}
- IEventHandle(ui32 type,
+ IEventHandleFat(ui32 type,
ui32 flags,
const TActorId& recipient,
const TActorId& sender,
@@ -201,25 +317,24 @@ namespace NActors {
ui64 cookie,
const TActorId* forwardOnNondelivery = nullptr,
NWilson::TTraceId traceId = {})
- : Type(type)
- , Flags(flags)
- , Recipient(recipient)
- , Sender(sender)
- , Cookie(cookie)
- , TraceId(std::move(traceId))
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- , SendTime(0)
-#endif
+ : IEventHandle({
+ .Recipient = recipient,
+ .Sender = sender,
+ .Type = type,
+ .Flags = flags,
+ .Cookie = cookie,
+ .RewriteRecipient = recipient,
+ .RewriteType = type,
+ .TraceId = std::move(traceId),
+ })
, Buffer(std::move(buffer))
- , RewriteRecipient(Recipient)
- , RewriteType(Type)
{
if (forwardOnNondelivery)
OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
}
// Special ctor for events from interconnect.
- IEventHandle(const TActorId& session,
+ IEventHandleFat(const TActorId& session,
ui32 type,
ui32 flags,
const TActorId& recipient,
@@ -228,20 +343,19 @@ namespace NActors {
ui64 cookie,
TScopeId originScopeId,
NWilson::TTraceId traceId) noexcept
- : Type(type)
- , Flags(flags)
- , Recipient(recipient)
- , Sender(sender)
- , Cookie(cookie)
+ : IEventHandle({
+ .Recipient = recipient,
+ .Sender = sender,
+ .Type = type,
+ .Flags = flags,
+ .Cookie = cookie,
+ .RewriteRecipient = recipient,
+ .RewriteType = type,
+ .TraceId = std::move(traceId),
+ })
, OriginScopeId(originScopeId)
- , TraceId(std::move(traceId))
, InterconnectSession(session)
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- , SendTime(0)
-#endif
, Buffer(std::move(buffer))
- , RewriteRecipient(Recipient)
- , RewriteType(Type)
{
}
@@ -286,28 +400,267 @@ namespace NActors {
TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
if (Event)
- return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
+ return new IEventHandleFat(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
else
- return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
+ return new IEventHandleFat(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
+ }
+
+ static TAutoPtr<IEventHandleFat> MakeFat(TAutoPtr<IEventHandle> ev) {
+ if (ev->IsEventLight()) {
+ Y_FAIL("Can't make light event fat");
+ } else {
+ TAutoPtr<IEventHandleFat> evb(static_cast<IEventHandleFat*>(ev.Release()));
+ return evb;
+ }
}
- TAutoPtr<IEventHandle> ForwardOnNondelivery(ui32 reason, bool unsure = false);
+ static IEventHandleFat* GetFat(IEventHandle* ev) {
+ if (ev->IsEventLight()) {
+ Y_FAIL("Can't make light event fat");
+ } else {
+ return static_cast<IEventHandleFat*>(ev);
+ }
+ }
+
+ static const IEventHandleFat* GetFat(const IEventHandle* ev) {
+ if (ev->IsEventLight()) {
+ Y_FAIL("Can't make light event fat");
+ } else {
+ return static_cast<const IEventHandleFat*>(ev);
+ }
+ }
+
+ static IEventHandleFat* GetFat(TAutoPtr<IEventHandle>& ev) {
+ return GetFat(ev.Get());
+ }
+
+ static IEventHandleFat* GetFat(THolder<IEventHandle>& ev) {
+ return GetFat(ev.Get());
+ }
+
+ static IEventHandleFat* GetFat(std::unique_ptr<IEventHandle>& ev) {
+ return GetFat(ev.get());
+ }
+
+ static TAutoPtr<IEventHandle> MakeBase(TAutoPtr<IEventHandleFat> ev) {
+ return ev.Release();
+ }
+
+ static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure = false);
};
template <typename TEventType>
- class TEventHandle: public IEventHandle {
- TEventHandle(); // we never made instance of TEventHandle
+ class TEventHandleFat: public IEventHandleFat {
+ TEventHandleFat(); // we never made instance of TEventHandleFat
public:
TEventType* Get() {
- return IEventHandle::Get<TEventType>();
+ return IEventHandleFat::Get<TEventType>();
}
TAutoPtr<TEventType> Release() {
- return IEventHandle::Release<TEventType>();
+ return IEventHandleFat::Release<TEventType>();
+ }
+ };
+
+ static_assert(sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat), "expect sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat)");
+
+ // light handle
+ class IEventHandleLight : public IEventHandle {
+ public:
+ IEventHandleLight(ui32 type) {
+ RewriteType = Type = type;
+ Light = true;
+ }
+
+ IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender) {
+ RewriteRecipient = Recipient = recipient;
+ Sender = sender;
+ return this;
+ }
+
+ IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags) {
+ RewriteRecipient = Recipient = recipient;
+ Sender = sender;
+ SetFlags(flags);
+ return this;
+ }
+
+ IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie) {
+ RewriteRecipient = Recipient = recipient;
+ Sender = sender;
+ SetFlags(flags);
+ Cookie = cookie;
+ return this;
+ }
+
+ IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) {
+ RewriteRecipient = Recipient = recipient;
+ Sender = sender;
+ SetFlags(flags);
+ Cookie = cookie;
+ TraceId = std::move(traceId);
+ return this;
+ }
+
+ void Forward(const TActorId& dest) {
+ RewriteRecipient = dest;
+ }
+
+ static IEventHandleLight* GetLight(IEventHandle* ev) {
+ if (ev->IsEventLight()) {
+ return static_cast<IEventHandleLight*>(ev);
+ } else {
+ Y_FAIL("Can't make fat event light");
+ }
+ }
+
+ static IEventHandleLight* GetLight(TAutoPtr<IEventHandle>& ev) {
+ return GetLight(ev.Get());
+ }
+
+ static IEventHandleLight* GetLight(THolder<IEventHandle>& ev) {
+ return GetLight(ev.Get());
+ }
+
+ static IEventHandleLight* GetLight(std::unique_ptr<IEventHandle>& ev) {
+ return GetLight(ev.get());
+ }
+
+ static IEventHandleLight* ReleaseLight(TAutoPtr<IEventHandle>& ev) {
+ return GetLight(ev.Release());
+ }
+
+ static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure = false);
+
+ template<typename TEventType>
+ TEventType* Get() {
+ if (Type != TEventType::EventType) {
+ Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
+ }
+ constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);};
+ if constexpr (couldBeConverted) {
+ return static_cast<TEventType*>(this);
+ } else {
+ Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data());
+ }
+ }
+
+ template<typename TEventType>
+ TEventType* CastAsLocal() {
+ constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);};
+ if constexpr (couldBeConverted) {
+ if (Type == TEventType::EventType) {
+ return static_cast<TEventType*>(this);
+ }
+ }
+ return nullptr;
+ }
+
+ template<typename TEventType>
+ TEventType* StaticCastAsLocal() {
+ constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);};
+ if constexpr (couldBeConverted) {
+ return static_cast<TEventType*>(this);
+ }
+ Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data());
}
};
- static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)");
+ template<typename TEventType>
+ TEventType* IEventHandle::Get() {
+ if (IsEventLight()) {
+ return IEventHandleLight::GetLight(this)->Get<TEventType>();
+ } else {
+ return IEventHandleFat::GetFat(this)->Get<TEventType>();
+ }
+ }
+
+ template<typename TEventType>
+ TEventType* IEventHandle::CastAsLocal() {
+ if (IsEventLight()) {
+ return IEventHandleLight::GetLight(this)->CastAsLocal<TEventType>();
+ } else {
+ return IEventHandleFat::GetFat(this)->CastAsLocal<TEventType>();
+ }
+ }
+
+ template<typename TEventType>
+ TEventType* IEventHandle::StaticCastAsLocal() {
+ if (IsEventLight()) {
+ return IEventHandleLight::GetLight(this)->StaticCastAsLocal<TEventType>();
+ } else {
+ return IEventHandleFat::GetFat(this)->StaticCastAsLocal<TEventType>();
+ }
+ }
+
+ template<typename TEventType>
+ TEventType* IEventHandle::Release(TAutoPtr<IEventHandle>& ev) {
+ if (ev->IsEventLight()) {
+ return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>();
+ } else {
+ return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release();
+ }
+ }
+
+ template<typename TEventType>
+ TEventType* IEventHandle::Release(THolder<IEventHandle>& ev) {
+ if (ev->IsEventLight()) {
+ return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>();
+ } else {
+ return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release();
+ }
+ }
+
+ template<typename TEventType>
+ TEventType* IEventHandle::Release(std::unique_ptr<IEventHandle>& ev) {
+ if (ev->IsEventLight()) {
+ return IEventHandleLight::GetLight(ev.release())->CastAsLocal<TEventType>();
+ } else {
+ return IEventHandleFat::GetFat(ev.get())->Release<TEventType>().Release();
+ }
+ }
+
+
+ class IEventHandleLightSerializable;
+
+ using TEventSerializer = std::function<bool(const IEventHandleLightSerializable*, TChunkSerializer*)>;
+
+ class IEventHandleLightSerializable : public IEventHandleLight {
+ public:
+ TEventSerializer Serializer;
+
+ IEventHandleLightSerializable(ui32 type, TEventSerializer serializer)
+ : IEventHandleLight(type)
+ , Serializer(std::move(serializer))
+ {
+ Serializable = true;
+ }
+
+ static IEventHandleLightSerializable* GetLightSerializable(IEventHandle* ev) {
+ if (ev->IsEventSerializable()) {
+ return static_cast<IEventHandleLightSerializable*>(ev);
+ } else {
+ Y_FAIL("Can't make serializable event");
+ }
+ }
+
+ static const IEventHandleLightSerializable* GetLightSerializable(const IEventHandle* ev) {
+ if (ev->IsEventSerializable()) {
+ return static_cast<const IEventHandleLightSerializable*>(ev);
+ } else {
+ Y_FAIL("Can't make serializable event");
+ }
+ }
+
+ static IEventHandleLightSerializable* GetLightSerializable(TAutoPtr<IEventHandle>& ev) {
+ return GetLightSerializable(ev.Get());
+ }
+
+ size_t GetSize() const {
+ // TODO(xenoxeno)
+ return 0;
+ }
+ };
template <typename TEventType, ui32 EventType0>
class TEventBase: public IEventBase {
@@ -318,7 +671,7 @@ namespace NActors {
}
// still abstract
- typedef TEventHandle<TEventType> THandle;
+ typedef TEventHandleFat<TEventType> THandle;
typedef TAutoPtr<THandle> TPtr;
};
@@ -327,7 +680,7 @@ namespace NActors {
return TString(header); \
} \
bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
- Y_FAIL("Local event " #eventType " is not serializable"); \
+ Y_FAIL("Local event %s is not serializable", TypeName(*this).data()); \
} \
static IEventBase* Load(NActors::TEventSerializedData*) { \
Y_FAIL("Local event " #eventType " has no load method"); \
@@ -349,4 +702,39 @@ namespace NActors {
bool IsSerializable() const override { \
return true; \
}
+
+
+ struct TEventMeta {
+ TActorId Session;
+ ui32 Type;
+ ui32 Flags;
+ TActorId Recipient;
+ TActorId Sender;
+ ui64 Cookie;
+ TScopeId OriginScopeId;
+ NWilson::TTraceId TraceId;
+ TRope Data;
+
+ bool IsExtendedFormat() const {
+ return Flags & IEventHandle::FlagExtendedFormat;
+ }
+ };
+
+ inline constexpr ui32 GetEventSpace(ui32 eventType) {
+ return (eventType >> 16u);
+ }
+
+ inline constexpr ui32 GetEventSubType(ui32 eventType) {
+ return (eventType & (0xffff));
+ }
+
+ struct IEventFactory {
+ virtual IEventHandle* Construct(const TEventMeta& eventMeta) = 0;
+ virtual void Destruct(IEventHandle*) = 0;
+ };
+
+ struct TEventFactories {
+ // it supposed to be statically allocated in the begining
+ static std::vector<std::vector<IEventFactory*>*> EventFactories; // [EventSpace][EventSubType]
+ };
}
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index 30cc26aa463..b845760221e 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -7,7 +7,7 @@
#include <library/cpp/actors/wilson/wilson_trace.h>
namespace NActors {
- class IEventHandle;
+ class IEventHandleFat;
struct TConstIoVec {
const void* Data;
diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h
index 2845aa94dd9..5288a65a894 100644
--- a/library/cpp/actors/core/event_local.h
+++ b/library/cpp/actors/core/event_local.h
@@ -71,4 +71,20 @@ namespace NActors {
return new TEv();
}
};
+
+ template<typename TEv, ui32 EventType0>
+ class TEventLight : public IEventHandleLight {
+ public:
+ static constexpr ui32 EventType = EventType0;
+
+ TEventLight()
+ : IEventHandleLight(EventType)
+ {}
+
+ TEv* Get() {
+ return static_cast<TEv*>(this);
+ }
+
+ using TPtr = TAutoPtr<TEv>;
+ };
}
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 594559cea78..ed696dc0f90 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -9,6 +9,8 @@
#include <util/generic/deque.h>
#include <util/system/context.h>
#include <util/system/filemap.h>
+#include <util/string/builder.h>
+#include <util/thread/lfstack.h>
#include <array>
namespace NActors {
@@ -511,4 +513,290 @@ namespace NActors {
dest->SetRawX1(src.RawX1());
dest->SetRawX2(src.RawX2());
}
+
+
+
+
+
+
+
+
+
+
+
+ template<ui32 EventSpace>
+ struct TLightEventSpaceFactories {
+ public:
+ static std::vector<IEventFactory*>& GetEventSpaceFactories(ui32 eventSpace) {
+ if (TEventFactories::EventFactories.size() <= eventSpace) {
+ // it's only safe to do BEFORE initialization of ActorSystem
+ TEventFactories::EventFactories.resize(eventSpace + 1);
+ }
+ if (TEventFactories::EventFactories[eventSpace] == nullptr) {
+ TEventFactories::EventFactories[eventSpace] = new std::vector<IEventFactory*>();
+ }
+ return *TEventFactories::EventFactories[eventSpace];
+ }
+
+ static void SetEventFactory(ui32 eventType, IEventFactory* factory) {
+ Y_VERIFY(GetEventSpace(eventType) == EventSpace);
+ auto eventSpaceFactories = GetEventSpaceFactories(GetEventSpace(eventType));
+ if (eventSpaceFactories.size() <= GetEventSubType(eventType)) {
+ // it's only safe to do BEFORE initialization of ActorSystem
+ eventSpaceFactories.resize(GetEventSubType(eventType) + 1);
+ }
+ eventSpaceFactories[GetEventSubType(eventType)] = factory;
+ }
+ };
+
+ template<typename TEvent>
+ class TLightEventFactory : public IEventFactory, TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)> {
+ private:
+ mutable size_t CachedByteSize = 0;
+
+ static constexpr char PayloadMarker = 0x07;
+ static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
+
+ static size_t SerializeNumber(size_t num, char *buffer) {
+ char *begin = buffer;
+ do {
+ *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
+ num >>= 7;
+ } while (num);
+ return buffer - begin;
+ }
+
+ static size_t DeserializeNumber(const char **ptr, const char *end) {
+ const char *p = *ptr;
+ size_t res = 0;
+ size_t offset = 0;
+ for (;;) {
+ if (p == end) {
+ return Max<size_t>();
+ }
+ const char byte = *p++;
+ res |= (static_cast<size_t>(byte) & 0x7F) << offset;
+ offset += 7;
+ if (!(byte & 0x80)) {
+ break;
+ }
+ }
+ *ptr = p;
+ return res;
+ }
+
+ static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) {
+ size_t res = 0;
+ size_t offset = 0;
+ for (;;) {
+ if (!iter.Valid()) {
+ return Max<size_t>();
+ }
+ const char byte = *iter.ContiguousData();
+ iter += 1;
+ --size;
+ res |= (static_cast<size_t>(byte) & 0x7F) << offset;
+ offset += 7;
+ if (!(byte & 0x80)) {
+ break;
+ }
+ }
+ return res;
+ }
+
+ public:
+ void DeserializeProtoEvent(TEvent* event, const TEventMeta& eventMeta) {
+ TRope::TConstIterator iter = eventMeta.Data.Begin();
+ ui64 size = eventMeta.Data.GetSize();
+ if (eventMeta.IsExtendedFormat()) {
+ constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;};
+ if constexpr (hasPayload) {
+ // check marker
+ if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
+ Y_FAIL("Invalid event marker");
+ }
+ // skip marker
+ iter += 1;
+ --size;
+ // parse number of payload ropes
+ size_t numRopes = DeserializeNumber(iter, size);
+ if (numRopes == Max<size_t>()) {
+ Y_FAIL("Invalid event rope number");
+ }
+ while (numRopes--) {
+ // parse length of the rope
+ const size_t len = DeserializeNumber(iter, size);
+ if (len == Max<size_t>() || size < len) {
+ Y_FAIL("%s", (TStringBuilder() << "Invalid event len# " << len << " size# " << size).data());
+ }
+ // extract the rope
+ TRope::TConstIterator begin = iter;
+ iter += len;
+ size -= len;
+ event->Payload.emplace_back(begin, iter);
+ }
+ } else {
+ Y_FAIL("%s", (TStringBuilder() << "Extended format is not supported for event " << TypeName<TEvent>()).data());
+ }
+ }
+ // parse the protobuf
+ TRopeStream stream(iter, size);
+ if (!event->Record.ParsePartialFromZeroCopyStream(&stream)) {
+ Y_FAIL("%s", (TStringBuilder() << "Failed to parse protobuf event type " << eventMeta.Type << " class " << TypeName<TEvent>()).data());
+ }
+ }
+
+ static bool SerializeProtoEvent(const TEvent* event, TChunkSerializer* chunker) {
+ constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;};
+ if constexpr (hasPayload) {
+ // serialize payload first
+ if (event->Payload) {
+ void *data;
+ int size = 0;
+ auto append = [&](const char *p, size_t len) {
+ while (len) {
+ if (size) {
+ const size_t numBytesToCopy = std::min<size_t>(size, len);
+ memcpy(data, p, numBytesToCopy);
+ data = static_cast<char*>(data) + numBytesToCopy;
+ size -= numBytesToCopy;
+ p += numBytesToCopy;
+ len -= numBytesToCopy;
+ } else if (!chunker->Next(&data, &size)) {
+ return false;
+ }
+ }
+ return true;
+ };
+ auto appendNumber = [&](size_t number) {
+ char buf[MaxNumberBytes];
+ return append(buf, SerializeNumber(number, buf));
+ };
+ char marker = PayloadMarker;
+ append(&marker, 1);
+ if (!appendNumber(event->Payload.size())) {
+ return false;
+ }
+ for (const TRope& rope : event->Payload) {
+ if (!appendNumber(rope.GetSize())) {
+ return false;
+ }
+ if (rope) {
+ if (size) {
+ chunker->BackUp(std::exchange(size, 0));
+ }
+ if (!chunker->WriteRope(&rope)) {
+ return false;
+ }
+ }
+ }
+ if (size) {
+ chunker->BackUp(size);
+ }
+ }
+ }
+ return event->Record.SerializeToZeroCopyStream(chunker);
+ }
+
+ public:
+ // TLockFreeStack<TEvent*> Stack;
+ // std::atomic<i32> StackSize;
+ // //std::deque<TEvent*> Stack;
+
+ // static constexpr i32 MAX_STACK_SIZE = 10;
+
+ TEvent* New() {
+ // TEvent* result;
+ // if (Stack.Dequeue(&result)) {
+ // --StackSize;
+ // return result;
+ // }
+ // /*if (!Stack.empty()) {
+ // TEvent* result = Stack.back();
+ // Stack.pop_back();
+ // return result;
+ // }*/
+ return new TEvent();
+ }
+
+ void Delete(TEvent* event) {
+ // if (StackSize < MAX_STACK_SIZE) {
+ // Stack.Enqueue(event);
+ // ++StackSize;
+ // } else {
+ // delete event;
+ // }
+ // /*if (Stack.size() < MAX_STACK_SIZE) {
+ // event->Record.Clear();
+ // Stack.push_back(event);
+ // return;
+ // }*/
+ delete event;
+ }
+
+ virtual IEventHandle* Construct(const TEventMeta& eventMeta) override {
+ IEventHandle* ev = nullptr;
+ if (eventMeta.Type == TEvent::EventType) {
+ TEvent* event = New();
+ DeserializeProtoEvent(event, eventMeta);
+ }
+ return ev;
+ }
+
+ virtual void Destruct(IEventHandle* ev) override {
+ if (ev->Type != TEvent::EventType) {
+ Y_FAIL("Wrong event supplied");
+ }
+ Delete(static_cast<TEvent*>(ev));
+ }
+
+ TLightEventFactory() {
+ Cerr << "TLightEventFactory<" << TypeName<TEvent>() << ">()" << Endl;
+ TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)>::SetEventFactory(TEvent::EventType, this);
+ }
+ };
+
+ template<typename TEv>
+ struct TLightEventFactoryInitializer {
+ static TLightEventFactory<TEv> EventFactory;
+
+ static TEv* New() {
+ return EventFactory.New();
+ }
+
+ static void Delete(TEv* event) {
+ EventFactory.Delete(event);
+ }
+ };
+
+ template<typename TEv>
+ TLightEventFactory<TEv> TLightEventFactoryInitializer<TEv>::EventFactory;
+
+ template<typename TEv, typename TRecord, ui32 EventType0>
+ class TEventLightPB : public IEventHandleLightSerializable, public TLightEventFactoryInitializer<TEv> {
+ public:
+ static constexpr ui32 EventType = EventType0;
+
+ TRecord Record;
+
+ static bool SerializeProto(const IEventHandleLightSerializable* event, TChunkSerializer* chunker) {
+ return TLightEventFactory<TEv>::SerializeProtoEvent(static_cast<const TEv*>(event), chunker);
+ }
+
+ TEventLightPB()
+ : IEventHandleLightSerializable(EventType, &TEventLightPB<TEv, TRecord, EventType0>::SerializeProto)
+ {}
+
+ TEv* Get() {
+ return static_cast<TEv*>(this);
+ }
+
+ using TPtr = TAutoPtr<TEv>;
+ };
+
+ template<typename TEv, typename TRecord, ui32 EventType0>
+ class TEventLightPBWithPayload : public TEventLightPB<TEv, TRecord, EventType0> {
+ public:
+ std::vector<TRope> Payload;
+ };
}
diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp
index dfd79bf96e9..ea5fd101243 100644
--- a/library/cpp/actors/core/events_undelivered.cpp
+++ b/library/cpp/actors/core/events_undelivered.cpp
@@ -38,23 +38,70 @@ namespace NActors {
return new TEvUndelivered(sourceType, reason);
}
- TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(ui32 reason, bool unsure) {
- if (Flags & FlagForwardOnNondelivery) {
- const ui32 updatedFlags = Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession);
- const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId();
+ TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure) {
+ std::unique_ptr<IEventHandle> tev(ev.Release());
+ TAutoPtr<IEventHandle> fw = ForwardOnNondelivery(tev, reason, unsure);
+ if (tev) {
+ // we don't want to delete original event handle here
+ ev = tev.release();
+ }
+ return fw;
+ }
+
+ TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure) {
+ if (ev->IsEventFat()) {
+ std::unique_ptr<IEventHandleFat> evf(IEventHandleFat::GetFat(ev.release()));
+ std::unique_ptr<IEventHandle> fw = IEventHandleFat::ForwardOnNondelivery(evf, reason, unsure);
+ if (evf) {
+ ev = std::unique_ptr<IEventHandle>(evf.release());
+ }
+ return fw.release();
+ }
+ if (ev->IsEventLight()) {
+ std::unique_ptr<IEventHandleLight> evl(IEventHandleLight::GetLight(ev.release()));
+ std::unique_ptr<IEventHandle> fw = IEventHandleLight::ForwardOnNondelivery(evl, reason, unsure);
+ if (evl) {
+ ev = std::unique_ptr<IEventHandle>(evl.release());
+ }
+ return fw.release();
+ }
+ return {};
+ }
- if (Event)
- return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, std::move(TraceId));
+ std::unique_ptr<IEventHandle> IEventHandleFat::ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure) {
+ if (ev->ForwardOnNondeliveryFlag) {
+ const ui32 updatedFlags = ev->Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession);
+ const TActorId recp = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId();
+
+ if (ev->Event)
+ return std::unique_ptr<IEventHandle>(new IEventHandleFat(recp, ev->Sender, ev->Event.Release(), updatedFlags, ev->Cookie, &ev->Recipient, std::move(ev->TraceId)));
else
- return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, std::move(TraceId));
+ return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Type, updatedFlags, recp, ev->Sender, ev->Buffer, ev->Cookie, &ev->Recipient, std::move(ev->TraceId)));
}
- if (Flags & FlagTrackDelivery) {
- const ui32 updatedFlags = Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered);
- return new IEventHandle(Sender, Recipient, new TEvents::TEvUndelivered(Type, reason, unsure), updatedFlags,
- Cookie, nullptr, std::move(TraceId));
+ if (ev->TrackDelivery) {
+ const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered);
+ return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags,
+ ev->Cookie, nullptr, std::move(ev->TraceId)));
}
+ return {};
+ }
- return nullptr;
+ std::unique_ptr<IEventHandle> IEventHandleLight::ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure) {
+ if (ev->ForwardOnNondeliveryFlag) {
+ ev->ForwardOnNondeliveryFlag = false;
+ ev->SubscribeOnSession = false;
+ auto recpt = ev->Recipient;
+ ev->Recipient = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId();
+ ev->OnNondeliveryHolder = MakeHolder<TOnNondelivery>(recpt);
+ return std::unique_ptr<IEventHandle>(ev.release());
+ }
+
+ if (ev->TrackDelivery) {
+ const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered);
+ return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags,
+ ev->Cookie, nullptr, std::move(ev->TraceId)));
+ }
+ return {};
}
}
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 3c5dc2c2b4d..50ae097a15c 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -96,13 +96,12 @@ namespace NActors {
inline void LwTraceSlowDelivery(IEventHandle* ev, const std::type_info* actorType, ui32 poolId, const TActorId& currentRecipient,
double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) {
- const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr;
LWPROBE(EventSlowDelivery,
poolId,
delivMs,
sinceActivationMs,
eventsExecutedBefore,
- baseEv ? SafeTypeName(baseEv) : (ev ? ToString(ev->Type) : TString("nullptr")),
+ ev && ev->HasEvent() ? ev->GetTypeName() : (ev ? ToString(ev->Type) : TString("nullptr")),
currentRecipient.ToString(),
SafeTypeName(actorType));
}
@@ -110,11 +109,10 @@ namespace NActors {
inline void LwTraceSlowEvent(IEventHandle* ev, ui32 evTypeForTracing, const std::type_info* actorType, ui32 poolId,
const TActorId& currentRecipient, double eventMs) {
// Event could have been destroyed by actor->Receive();
- const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr;
LWPROBE(SlowEvent,
poolId,
eventMs,
- baseEv ? SafeTypeName(baseEv) : ToString(evTypeForTracing),
+ ev && ev->HasEvent() ? ev->GetTypeName() : ToString(evTypeForTracing),
currentRecipient.ToString(),
SafeTypeName(actorType));
}
@@ -178,7 +176,7 @@ namespace NActors {
NProfiling::TMemoryTagScope::Reset(ActorSystem->MemProfActivityBase + activityType);
}
- actor->Receive(ev, ctx);
+ actor->Receive(ev);
size_t dyingActorsCnt = DyingActors.size();
Ctx.UpdateActorsStats(dyingActorsCnt);
@@ -204,7 +202,7 @@ namespace NActors {
} else {
actorType = nullptr;
- TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
+ TAutoPtr<IEventHandle> nonDelivered = IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown);
if (nonDelivered.Get()) {
ActorSystem->Send(nonDelivered);
} else {
diff --git a/library/cpp/actors/core/io_dispatcher.cpp b/library/cpp/actors/core/io_dispatcher.cpp
index 6bd753f2e04..cccb2336ffc 100644
--- a/library/cpp/actors/core/io_dispatcher.cpp
+++ b/library/cpp/actors/core/io_dispatcher.cpp
@@ -116,7 +116,7 @@ namespace NActors {
bool sendNotify;
if (!Actor.TaskQueue.Dequeue(tasks, &sendNotify)) {
if (sendNotify) {
- ActorSystem->Send(new IEventHandle(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(),
+ ActorSystem->Send(new IEventHandleFat(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(),
nullptr, TThread::CurrentThreadId()));
}
break;
diff --git a/library/cpp/actors/core/io_dispatcher.h b/library/cpp/actors/core/io_dispatcher.h
index b0e4e60d1a7..a33cb0d98e6 100644
--- a/library/cpp/actors/core/io_dispatcher.h
+++ b/library/cpp/actors/core/io_dispatcher.h
@@ -28,7 +28,7 @@ namespace NActors {
*/
template<typename TCallback>
static void InvokeIoCallback(TCallback&& callback, ui32 poolId, IActor::EActivityType activityType) {
- if (!TActivationContext::Send(new IEventHandle(MakeIoDispatcherActorId(), TActorId(),
+ if (!TActivationContext::Send(new IEventHandleFat(MakeIoDispatcherActorId(), TActorId(),
new TEvInvokeQuery(callback)))) {
TActivationContext::Register(CreateExecuteLaterActor(std::move(callback), activityType), TActorId(),
TMailboxType::HTSwap, poolId);
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index 5000216f50c..c7bed09f1ba 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -116,7 +116,7 @@
const auto& currentTracer = component; \
if (ev->HasEvent()) { \
LOG_TRACE(*TlsActivationContext, currentTracer, "%s, received event# %" PRIu32 ", Sender %s, Recipient %s: %s", \
- __FUNCTION__, ev->Type, ev->Sender.ToString().data(), SelfId().ToString().data(), ev->GetBase()->ToString().substr(0, 1000).data()); \
+ __FUNCTION__, ev->Type, ev->Sender.ToString().data(), SelfId().ToString().data(), ev->ToString().substr(0, 1000).data()); \
} else { \
LOG_TRACE(*TlsActivationContext, currentTracer, "%s, received event# %" PRIu32 ", Sender %s, Recipient %s", \
__FUNCTION__, ev->Type, ev->Sender.ToString().data(), ev->Recipient.ToString().data()); \
@@ -205,7 +205,7 @@ namespace NActors {
std::shared_ptr<NMonitoring::TMetricRegistry> metrics);
~TLoggerActor();
- void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+ void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
switch (ev->GetTypeRewrite()) {
HFunc(TFlushLogBuffer, FlushLogBufferMessageEvent);
HFunc(NLog::TEvLog, HandleLogEvent);
@@ -316,7 +316,7 @@ namespace NActors {
{
const NLog::TSettings *mSettings = ctx.LoggerSettings();
TLoggerActor::Throttle(*mSettings);
- ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
+ ctx.Send(new IEventHandleFat(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
}
template <typename TCtx, typename... TArgs>
diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp
index 995e3c4121c..572af788ec9 100644
--- a/library/cpp/actors/core/log_ut.cpp
+++ b/library/cpp/actors/core/log_ut.cpp
@@ -105,19 +105,19 @@ namespace {
{}
void WriteLog() {
- Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")});
+ Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")});
}
void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg, TString msg = "foo") {
- Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)});
+ Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)});
}
void FlushLogBuffer() {
- Runtime.Send(new IEventHandle{LoggerActor, {}, new TFlushLogBuffer()});
+ Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TFlushLogBuffer()});
}
void Wakeup() {
- Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup});
+ Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvents::TEvWakeup});
}
TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()};
diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp
index febc5e40dd2..fef051da619 100644
--- a/library/cpp/actors/core/scheduler_actor.cpp
+++ b/library/cpp/actors/core/scheduler_actor.cpp
@@ -234,7 +234,7 @@ namespace NActors {
sentCount = Min(eventsToBeSentSize, Cfg.RelaxedSendPaceEventsPerCycle);
}
for (ui32 i = 0; i < sentCount; ++i) {
- ctx.Send(EventsToBeSent.front());
+ ctx.Send(EventsToBeSent.front().Release());
EventsToBeSent.pop_front();
}