aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-03-28 14:31:05 +0300
committerxenoxeno <xeno@ydb.tech>2023-03-28 14:31:05 +0300
commit33421d638103cc382ba851d2491740e2db576307 (patch)
tree166e19b62c40deb088b62651e2a0cb86d4ed8f5c /library/cpp
parent8cf3b1d08aa8791cd5cb7ee2a11fbb712cd72d16 (diff)
downloadydb-33421d638103cc382ba851d2491740e2db576307.tar.gz
revert light events
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/actor.cpp36
-rw-r--r--library/cpp/actors/core/actor.h178
-rw-r--r--library/cpp/actors/core/actor_bootstrapped.h2
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp4
-rw-r--r--library/cpp/actors/core/actor_coroutine.h7
-rw-r--r--library/cpp/actors/core/actor_ut.cpp16
-rw-r--r--library/cpp/actors/core/actor_virtual.h8
-rw-r--r--library/cpp/actors/core/actorsystem.cpp14
-rw-r--r--library/cpp/actors/core/actorsystem.h2
-rw-r--r--library/cpp/actors/core/ask.cpp4
-rw-r--r--library/cpp/actors/core/av_bootstrapped.cpp2
-rw-r--r--library/cpp/actors/core/event.cpp74
-rw-r--r--library/cpp/actors/core/event.h643
-rw-r--r--library/cpp/actors/core/event_load.h2
-rw-r--r--library/cpp/actors/core/event_local.h17
-rw-r--r--library/cpp/actors/core/event_pb.h249
-rw-r--r--library/cpp/actors/core/events_undelivered.cpp60
-rw-r--r--library/cpp/actors/core/executor_thread.cpp4
-rw-r--r--library/cpp/actors/core/io_dispatcher.cpp2
-rw-r--r--library/cpp/actors/core/io_dispatcher.h2
-rw-r--r--library/cpp/actors/core/log.h2
-rw-r--r--library/cpp/actors/core/log_ut.cpp8
-rw-r--r--library/cpp/actors/dnsresolver/dnsresolver.cpp4
-rw-r--r--library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp10
-rw-r--r--library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp2
-rw-r--r--library/cpp/actors/dnsresolver/dnsresolver_ut.cpp12
-rw-r--r--library/cpp/actors/examples/02_discovery/lookup.cpp2
-rw-r--r--library/cpp/actors/helpers/flow_controlled_queue.cpp9
-rw-r--r--library/cpp/actors/helpers/future_callback.h27
-rw-r--r--library/cpp/actors/helpers/selfping_actor_ut.cpp2
-rw-r--r--library/cpp/actors/http/http_proxy_incoming.cpp2
-rw-r--r--library/cpp/actors/http/http_ut.cpp30
-rw-r--r--library/cpp/actors/interconnect/event_filter.h2
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h10
-rw-r--r--library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_resolve.cpp6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp26
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h23
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h12
-rw-r--r--library/cpp/actors/interconnect/load.cpp2
-rw-r--r--library/cpp/actors/interconnect/mock/ic_mock.cpp57
-rw-r--r--library/cpp/actors/interconnect/packet.cpp25
-rw-r--r--library/cpp/actors/interconnect/packet.h10
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp2
-rw-r--r--library/cpp/actors/interconnect/types.h2
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/interconnect_ut.cpp38
-rw-r--r--library/cpp/actors/interconnect/ut/poller_actor_ut.cpp2
-rw-r--r--library/cpp/actors/testlib/decorator_ut.cpp10
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp34
-rw-r--r--library/cpp/actors/testlib/test_runtime.h4
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp2
-rw-r--r--library/cpp/actors/wilson/wilson_uploader.cpp2
58 files changed, 364 insertions, 1375 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index 304f7405ae..00eef387ea 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -16,22 +16,6 @@ namespace NActors {
return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId));
}
- bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept {
- return SelfActorId.Send(recipient, ev);
- }
-
- bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept {
- return SelfActorId.Send(recipient, ev, flags);
- }
-
- bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept {
- return SelfActorId.Send(recipient, ev, flags, cookie);
- }
-
- bool IActor::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept {
- return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId));
- }
-
void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
}
@@ -45,15 +29,15 @@ namespace NActors {
}
void TActorIdentity::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie);
+ return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie);
}
void TActorIdentity::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- return TActivationContext::Schedule(deadline, new IEventHandleFat(*this, {}, ev), cookie);
+ return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie);
}
void TActorIdentity::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const {
- return TActivationContext::Schedule(delta, new IEventHandleFat(*this, {}, ev), cookie);
+ return TActivationContext::Schedule(delta, new IEventHandle(*this, {}, ev), cookie);
}
TActorId TActivationContext::RegisterWithSameMailbox(IActor* actor, TActorId parentId) {
@@ -91,27 +75,27 @@ namespace NActors {
}
void TActorContext::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie);
+ ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
}
void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfID, TActorId(), ev), cookie);
+ ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
}
void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(delta, new IEventHandleFat(SelfID, TActorId(), ev), cookie);
+ ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie);
}
void IActor::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie);
+ TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie);
+ TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandleFat(SelfActorId, TActorId(), ev), cookie);
+ TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
TInstant TActivationContext::Now() {
@@ -161,7 +145,7 @@ namespace NActors {
(actor->*StateFunc)(ev, TActivationContext::AsActorContext());
}
- void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev) {
+ void TActorVirtualBehaviour::Receive(IActor* actor, std::unique_ptr<IEventHandle> ev) {
Y_VERIFY(!!ev && ev->GetBase());
ev->GetBase()->Execute(actor, std::move(ev));
}
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 5e04c0f2ab..e1232c01a8 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -139,16 +139,10 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
template <ESendingType SendingType = ESendingType::Common>
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
- template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
template <ESendingType SendingType = ESendingType::Common>
- bool Send(const TActorId& recipient, THolder<IEventHandleLight> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
- return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId));
- }
- template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId));
}
@@ -228,14 +222,6 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
- template <ESendingType SendingType = ESendingType::Common>
- bool Send(const TActorId& recipient, IEventHandleLight* ev) const;
- template <ESendingType SendingType = ESendingType::Common>
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const;
- template <ESendingType SendingType = ESendingType::Common>
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const;
- template <ESendingType SendingType = ESendingType::Common>
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const;
bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
@@ -284,7 +270,7 @@ namespace NActors {
class TActorVirtualBehaviour {
public:
- static void Receive(IActor* actor, std::unique_ptr<IEventHandleFat> ev);
+ static void Receive(IActor* actor, std::unique_ptr<IEventHandle> ev);
public:
};
@@ -333,8 +319,6 @@ namespace NActors {
friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
friend class TDecorator;
protected:
- using TReceiveFuncLight = void (IActor::*)(TAutoPtr<IEventHandle>& ev);
- TReceiveFuncLight StateFuncLight = nullptr;
TActorCallbackBehaviour CImpl;
public:
using TReceiveFunc = TActorCallbackBehaviour::TReceiveFunc;
@@ -512,17 +496,13 @@ namespace NActors {
TActorIdentity SelfId() const {
return SelfActorId;
}
- // void Receive(TAutoPtr<IEventHandle> ev, const TActorContext& /*ctx*/) {
- // Receive(ev);
- // }
- void Receive(TAutoPtr<IEventHandle>& ev) {
+
+ void Receive(TAutoPtr<IEventHandle>& ev, const TActorContext& /*ctx*/) {
++HandledEvents;
- if (StateFuncLight) {
- (this->*StateFuncLight)(ev);
- } else if (CImpl.Initialized()) {
+ if (CImpl.Initialized()) {
CImpl.Receive(this, ev);
} else {
- TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandleFat>(IEventHandleFat::MakeFat(ev).Release()));
+ TActorVirtualBehaviour::Receive(this, std::unique_ptr<IEventHandle>(ev.Release()));
}
}
@@ -535,10 +515,6 @@ namespace NActors {
void Describe(IOutputStream&) const noexcept override;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
- bool Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept;
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const noexcept;
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const noexcept;
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept;
bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
@@ -639,78 +615,8 @@ namespace NActors {
}
};
- template<typename TDerived>
- class TActorCallback : public IActorCallback {
- public:
- using TFatReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&, const TActorContext&);
- using TLightReceiveFunc = void (TDerived::*)(TAutoPtr<IEventHandle>&);
-
- private:
- void BecomeFat(TFatReceiveFunc stateFunc) {
- IActorCallback::Become(stateFunc);
- }
-
- template<typename... TArgs>
- void BecomeFat(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) {
- IActorCallback::Become(stateFunc, ctx, std::forward<TArgs>(args)...);
- }
-
- template<typename... TArgs>
- void BecomeFat(TFatReceiveFunc stateFunc, TArgs&&... args) {
- IActorCallback::Become(stateFunc, std::forward<TArgs>(args)...);
- }
-
- void BecomeLight(TLightReceiveFunc stateFuncLight) {
- StateFuncLight = static_cast<TReceiveFuncLight>(stateFuncLight);
- }
-
- public:
- TActorCallback(TFatReceiveFunc stateFunc, ui32 activityType = OTHER)
- : IActorCallback(static_cast<TReceiveFunc>(stateFunc), activityType)
- {
- }
-
- TActorCallback(TLightReceiveFunc stateFunc, ui32 activityType = OTHER)
- : IActorCallback(nullptr, activityType)
- {
- Become(stateFunc);
- }
-
- template<typename T>
- void Become(T stateFunc) {
- IActorCallback::Become(stateFunc);
- }
-
- template<typename T, typename... TArgs>
- void Become(T stateFunc, const TActorContext&, TArgs&&... args) {
- IActorCallback::Become(stateFunc);
- Schedule(std::forward<TArgs>(args)...);
- }
-
- template<typename T, typename... TArgs>
- void Become(T stateFunc, TArgs&&... args) {
- IActorCallback::Become(stateFunc);
- Schedule(std::forward<TArgs>(args)...);
- }
-
- template<>
- void Become<TLightReceiveFunc>(TLightReceiveFunc stateFunc) {
- BecomeLight(stateFunc);
- }
-
- template<>
- void Become<TFatReceiveFunc>(TFatReceiveFunc stateFunc) {
- BecomeFat(stateFunc);
- }
-
- template<typename... TArgs>
- void Become(TFatReceiveFunc stateFunc, const TActorContext& ctx, TArgs&&... args) {
- BecomeFat(stateFunc, ctx, std::forward<TArgs>(args)...);
- }
- };
-
template <typename TDerived>
- class TActor: public TActorCallback<TDerived> {
+ class TActor: public IActorCallback {
private:
template <typename T, typename = const char*>
struct HasActorName: std::false_type { };
@@ -741,13 +647,8 @@ namespace NActors {
// static constexpr char ActorName[] = "UNNAMED";
- TActor(typename TActorCallback<TDerived>::TFatReceiveFunc func, ui32 activityType = GetActivityTypeIndex())
- : TActorCallback<TDerived>(func, activityType)
- {
- }
-
- TActor(typename TActorCallback<TDerived>::TLightReceiveFunc func, ui32 activityType = GetActivityTypeIndex())
- : TActorCallback<TDerived>(func, activityType)
+ TActor(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext&), ui32 activityType = GetActivityTypeIndex())
+ : IActorCallback(static_cast<TReceiveFunc>(func), activityType)
{
}
@@ -758,10 +659,8 @@ namespace NActors {
#define STFUNC_SIG TAutoPtr< ::NActors::IEventHandle>&ev, const ::NActors::TActorContext &ctx
#define STATEFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
-#define LIGHTFN_SIG TAutoPtr<::NActors::IEventHandle>& ev
#define STFUNC(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& ctx)
#define STATEFN(funcName) void funcName(TAutoPtr< ::NActors::IEventHandle>& ev, const ::NActors::TActorContext& )
-#define LIGHTFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
#define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_VERIFY_DEBUG(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype);
@@ -773,26 +672,13 @@ namespace NActors {
UNHANDLED_MSG_HANDLER \
}
-#define LIGHTFN_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \
- switch (const ui32 etype = ev->GetTypeRewrite()) { \
- HANDLERS \
- default: \
- UNHANDLED_MSG_HANDLER \
- }
-
#define STRICT_STFUNC_BODY(HANDLERS) STFUNC_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER)
-#define STRICT_LIGHTFN_BODY(HANDLERS) LIGHTFN_BODY(HANDLERS, STFUNC_STRICT_UNHANDLED_MSG_HANDLER)
#define STRICT_STFUNC(NAME, HANDLERS) \
void NAME(STFUNC_SIG) { \
STRICT_STFUNC_BODY(HANDLERS) \
}
-#define STRICT_LIGHTFN(NAME, HANDLERS) \
- void NAME(LIGHTFN_SIG) { \
- STRICT_LIGHTFN_BODY(HANDLERS) \
- }
-
#define STRICT_STFUNC_EXC(NAME, HANDLERS, EXCEPTION_HANDLERS) \
void NAME(STFUNC_SIG) { \
try { \
@@ -837,7 +723,7 @@ namespace NActors {
STFUNC(State) {
if (DoBeforeReceiving(ev, ctx)) {
- Actor->Receive(ev);
+ Actor->Receive(ev, ctx);
DoAfterReceiving(ctx);
}
}
@@ -934,25 +820,17 @@ namespace NActors {
template <ESendingType SendingType>
bool TActivationContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
- IEventHandle::Forward(ev, recipient);
- return Send(ev);
+ return Send(IEventHandle::Forward(ev, recipient));
}
template <ESendingType SendingType>
bool TActivationContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
- IEventHandle::Forward(ev, recipient);
- return Send(ev);
+ return Send(IEventHandle::Forward(ev, recipient));
}
template <ESendingType SendingType>
bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return Send<SendingType>(new IEventHandleFat(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
- }
-
- template <ESendingType SendingType>
- bool TActorContext::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- ev->PrepareSend(recipient, SelfID, flags, cookie, std::move(traceId));
- return Send<SendingType>(ev);
+ return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
}
template <ESendingType SendingType>
@@ -962,14 +840,12 @@ namespace NActors {
template <ESendingType SendingType>
bool TActorContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const {
- IEventHandle::Forward(ev, recipient);
- return ExecutorThread.Send<SendingType>(ev);
+ return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
}
template <ESendingType SendingType>
bool TActorContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const {
- IEventHandle::Forward(ev, recipient);
- return ExecutorThread.Send<SendingType>(ev);
+ return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
}
template <ESendingType SendingType>
@@ -984,31 +860,7 @@ namespace NActors {
template <ESendingType SendingType>
bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return TActivationContext::Send<SendingType>(new IEventHandleFat(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
- }
-
- template <ESendingType SendingType>
- bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev) const {
- ev->PrepareSend(recipient, *this);
- return TActivationContext::Send<SendingType>(ev);
- }
-
- template <ESendingType SendingType>
- bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags) const {
- ev->PrepareSend(recipient, *this, flags);
- return TActivationContext::Send<SendingType>(ev);
- }
-
- template <ESendingType SendingType>
- bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const {
- ev->PrepareSend(recipient, *this, flags, cookie);
- return TActivationContext::Send<SendingType>(ev);
- }
-
- template <ESendingType SendingType>
- bool TActorIdentity::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- ev->PrepareSend(recipient, *this, flags, cookie, std::move(traceId));
- return TActivationContext::Send<SendingType>(ev);
+ return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
}
template <ESendingType SendingType>
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h
index 37286d48e2..9d89afcf70 100644
--- a/library/cpp/actors/core/actor_bootstrapped.h
+++ b/library/cpp/actors/core/actor_bootstrapped.h
@@ -12,7 +12,7 @@ namespace NActors {
class TActorBootstrapped: public TActor<TDerived> {
protected:
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override {
- return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0);
+ return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parentId, {}, 0);
}
STFUNC(StateBootstrap) {
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index 0fd54ff812..51e21e2950 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -41,9 +41,9 @@ namespace NActors {
}
THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TMonotonic deadline) {
- IEventHandleFat *timeoutEv = nullptr;
+ IEventHandle *timeoutEv = nullptr;
if (deadline != TMonotonic::Max()) {
- TActivationContext::Schedule(deadline, timeoutEv = new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0,
+ TActivationContext::Schedule(deadline, timeoutEv = new IEventHandle(TEvents::TSystem::CoroTimeout, 0,
SelfActorId, {}, nullptr, 0));
}
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index fdf928a803..7a7844cc85 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -57,8 +57,8 @@ namespace NActors {
THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max());
// Wait for specific event set by filter functor. Function returns first event that matches filter. On any other
- // kind of event processUnexpectedEvent() is called.
//
+ // kind of event processUnexpectedEvent() is called.
// Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; });
template <typename TFunc, typename TCallback, typename = std::enable_if_t<std::is_invocable_v<TCallback, TAutoPtr<IEventHandle>>>>
THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TCallback processUnexpectedEvent, TMonotonic deadline = TMonotonic::Max()) {
@@ -117,8 +117,7 @@ namespace NActors {
bool Send(TAutoPtr<IEventHandle> ev);
bool Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
- IEventHandle::Forward(ev, recipient);
- return Send(ev.Release());
+ return Send(IEventHandle::Forward(ev, recipient).Release());
}
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) {
@@ -166,7 +165,7 @@ namespace NActors {
~TActorCoro();
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override {
- return new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0);
+ return new IEventHandle(TEvents::TSystem::Bootstrap, 0, self, parent, {}, 0);
}
private:
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index fe492e531e..86b19af546 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
ctx.RegisterWithSameMailbox(new TDummyActor());
}
if (Role == ERole::Leader) {
- TAutoPtr<IEventHandle> ev = new IEventHandleFat(Receiver, SelfId(), new TEvents::TEvPing());
+ TAutoPtr<IEventHandle> ev = new IEventHandle(Receiver, SelfId(), new TEvents::TEvPing());
SpecialSend(ev, ctx);
}
}
@@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
if (AllocatesMemory) {
- SpecialSend(new IEventHandleFat(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx);
+ SpecialSend(new IEventHandle(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx);
} else {
std::swap(*const_cast<TActorId*>(&ev->Sender), *const_cast<TActorId*>(&ev->Recipient));
ev->DropRewrite();
@@ -541,14 +541,14 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
{
}
- bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
+ bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) override {
*Counter += 1;
if (ev->Type != TEvents::THelloWorld::Pong) {
- TAutoPtr<IEventHandle> pingEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPing());
+ TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing());
SavedEvent = ev;
- Actor->Receive(pingEv);
+ Actor->Receive(pingEv, ctx);
} else {
- Actor->Receive(SavedEvent);
+ Actor->Receive(SavedEvent, ctx);
}
return false;
}
@@ -566,7 +566,7 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
*Counter += 1;
if (ev->Type == TEvents::THelloWorld::Ping) {
- TAutoPtr<IEventHandle> pongEv = new IEventHandleFat(SelfId(), SelfId(), new TEvents::TEvPong());
+ TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong());
Send(SelfId(), new TEvents::TEvPong());
return false;
}
@@ -701,7 +701,7 @@ Y_UNIT_TEST_SUITE(TestStateFunc) {
auto sender = runtime.AllocateEdgeActor();
auto testActor = runtime.Register(new TTestActorWithExceptionsStateFunc());
for (ui64 tag = 0; tag < 4; ++tag) {
- runtime.Send(new IEventHandleFat(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true);
+ runtime.Send(new IEventHandle(testActor, sender, new TEvents::TEvWakeup(tag)), 0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Tag, tag);
}
diff --git a/library/cpp/actors/core/actor_virtual.h b/library/cpp/actors/core/actor_virtual.h
index 1306252b58..c9c34c4729 100644
--- a/library/cpp/actors/core/actor_virtual.h
+++ b/library/cpp/actors/core/actor_virtual.h
@@ -8,7 +8,7 @@ template <class TEvent>
class TEventContext {
private:
TEvent* Event;
- std::unique_ptr<IEventHandleFat> Handle;
+ std::unique_ptr<IEventHandle> Handle;
public:
const TEvent* operator->() const {
return Event;
@@ -16,7 +16,7 @@ public:
const IEventHandle& GetHandle() const {
return *Handle;
}
- TEventContext(std::unique_ptr<IEventHandleFat> handle)
+ TEventContext(std::unique_ptr<IEventHandle> handle)
: Handle(std::move(handle))
{
Y_VERIFY_DEBUG(dynamic_cast<TEvent*>(Handle->GetBase()));
@@ -28,7 +28,7 @@ public:
template <class TEvent, class TExpectedActor>
class IEventForActor: public IEventBase {
protected:
- virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override {
+ virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override {
Y_VERIFY_DEBUG(dynamic_cast<TExpectedActor*>(actor));
auto* actorCorrect = static_cast<TExpectedActor*>(actor);
TEventContext<TEvent> context(std::move(eventPtr));
@@ -41,7 +41,7 @@ public:
template <class TBaseEvent, class TEvent, class TExpectedObject>
class IEventForAnything: public TBaseEvent {
protected:
- virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) override {
+ virtual bool DoExecute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) override {
auto* objImpl = dynamic_cast<TExpectedObject*>(actor);
if (!objImpl) {
return false;
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index b33861a966..9a8a0abd01 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -74,7 +74,7 @@ namespace NActors {
if (recpNodeId != NodeId && recpNodeId != 0) {
// if recipient is not local one - rewrite with forward instruction
- //Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable());
+ Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable());
Y_VERIFY(ev->Recipient == recipient,
"Event rewrite from %s to %s would be lost via interconnect",
ev->Recipient.ToString().c_str(),
@@ -96,7 +96,7 @@ namespace NActors {
}
if (target != actorId) {
// a race has occured, terminate newly created actor
- Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
+ Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
}
}
recipient = target;
@@ -122,7 +122,7 @@ namespace NActors {
bool TActorSystem::GenericSend<&IExecutorPool::SpecificSend>(TAutoPtr<IEventHandle> ev) const;
bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie) const {
- return this->Send(new IEventHandleFat(recipient, DefSelfID, ev, flags, cookie));
+ return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie));
}
bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev) const {
@@ -140,14 +140,6 @@ namespace NActors {
}
}
- bool TActorSystem::Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags, ui64 cookie) const {
- return this->Send(ev->PrepareSend(recipient, DefSelfID, flags, cookie));
- }
-
- bool TActorSystem::Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags, ui64 cookie) const {
- return this->Send(ev->PrepareSend(recipient, sender, flags, cookie));
- }
-
void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
Schedule(deadline - Timestamp(), ev, cookie);
}
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 0bc788f9be..b9ecaa2b9e 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -204,8 +204,6 @@ namespace NActors {
bool SpecificSend(TAutoPtr<IEventHandle> ev) const;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const;
- bool Send(const TActorId& recipient, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const;
- bool Send(const TActorId& recipient, const TActorId& sender, IEventHandleLight* ev, ui32 flags = 0, ui64 cookie = 0) const;
/**
* Schedule one-shot event that will be send at given time point in the future.
diff --git a/library/cpp/actors/core/ask.cpp b/library/cpp/actors/core/ask.cpp
index e3fe6d9ee3..40c6748d56 100644
--- a/library/cpp/actors/core/ask.cpp
+++ b/library/cpp/actors/core/ask.cpp
@@ -47,9 +47,9 @@ namespace NActors {
if (ev->GetTypeRewrite() == TTimeout::EventType) {
Promise_.SetException(std::make_exception_ptr(yexception() << "ask timeout"));
} else if (!ExpectedEventType_ || ev->GetTypeRewrite() == ExpectedEventType_) {
- Promise_.SetValue(IEventHandleFat::GetFat(ev.Get())->ReleaseBase());
+ Promise_.SetValue(ev.Get()->ReleaseBase());
} else {
- Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << IEventHandleFat::GetFat(ev.Get())->GetBase()->ToString()));
+ Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << ev.Get()->GetBase()->ToString()));
}
PassAway();
diff --git a/library/cpp/actors/core/av_bootstrapped.cpp b/library/cpp/actors/core/av_bootstrapped.cpp
index e112302792..771177242e 100644
--- a/library/cpp/actors/core/av_bootstrapped.cpp
+++ b/library/cpp/actors/core/av_bootstrapped.cpp
@@ -7,7 +7,7 @@ public:
};
TAutoPtr<NActors::IEventHandle> TActorAutoStart::AfterRegister(const TActorId& self, const TActorId& parentId) {
- return new IEventHandleFat(self, parentId, new TEventForStart, 0);
+ return new IEventHandle(self, parentId, new TEventForStart, 0);
}
void TActorAutoStart::ProcessEvent(TEventContext<TEventForStart>& ev) {
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp
index b963dfb4ea..6ffe42f65b 100644
--- a/library/cpp/actors/core/event.cpp
+++ b/library/cpp/actors/core/event.cpp
@@ -7,79 +7,19 @@ namespace NActors {
Max<ui64>(), Max<ui64>()
};
- TAutoPtr<IEventHandle>& IEventHandle::Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient) {
- if (ev->IsEventLight()) {
- IEventHandleLight::GetLight(ev.Get())->Forward(recipient);
- } else {
- ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient);
- }
- return ev;
- }
-
- THolder<IEventHandle>& IEventHandle::Forward(THolder<IEventHandle>& ev, TActorId recipient) {
- if (ev->IsEventLight()) {
- IEventHandleLight::GetLight(ev.Get())->Forward(recipient);
- } else {
- ev = IEventHandleFat::GetFat(ev.Get())->Forward(recipient);
- }
- return ev;
- }
-
TString IEventHandle::GetTypeName() const {
- if (IsEventFat()) {
- auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this));
- return ev->HasEvent() ? TypeName(*(ev->GetBase())) : TypeName(*this);
- } else {
- return TypeName(*this);
- }
+ return HasEvent() ? TypeName(*(const_cast<IEventHandle*>(this)->GetBase())) : TypeName(*this);
}
TString IEventHandle::ToString() const {
- if (IsEventFat()) {
- auto* ev = const_cast<IEventHandleFat*>(static_cast<const IEventHandleFat*>(this));
- return ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?";
- } else {
- // TODO(xenoxeno):
- return TypeName(*this);
- }
+ return HasEvent() ? const_cast<IEventHandle*>(this)->GetBase()->ToString().data() : "serialized?";
}
- bool IEventHandle::HasEvent() const {
- if (IsEventLight()) {
- return true;
- } else {
- return IEventHandleFat::GetFat(this)->HasEvent();
- }
+ std::unique_ptr<IEventHandle> IEventHandle::Forward(std::unique_ptr<IEventHandle>&& ev, TActorId recipient) {
+ return std::unique_ptr<IEventHandle>(ev->Forward(recipient).Release());
}
- bool IEventHandle::HasBuffer() const {
- if (IsEventLight()) {
- return false;
- } else {
- return IEventHandleFat::GetFat(this)->HasBuffer();
- }
- }
-
- TActorId IEventHandle::GetForwardOnNondeliveryRecipient() const {
- if (IsEventLight()) {
- return {};
- } else {
- return IEventHandleFat::GetFat(this)->GetForwardOnNondeliveryRecipient();
- }
- }
-
- size_t IEventHandle::GetSize() const {
- if (IsEventLight()) {
- if (IsEventSerializable()) {
- return IEventHandleLightSerializable::GetLightSerializable(this)->GetSize();
- }
- } else {
- return IEventHandleFat::GetFat(this)->GetSize();
- }
- return 0;
- }
-
- TIntrusivePtr<TEventSerializedData> IEventHandleFat::ReleaseChainBuffer() {
+ TIntrusivePtr<TEventSerializedData> IEventHandle::ReleaseChainBuffer() {
if (Buffer) {
TIntrusivePtr<TEventSerializedData> result;
DoSwap(result, Buffer);
@@ -96,7 +36,7 @@ namespace NActors {
return new TEventSerializedData;
}
- TIntrusivePtr<TEventSerializedData> IEventHandleFat::GetChainBuffer() {
+ TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() {
if (Buffer) {
return Buffer;
}
@@ -108,6 +48,4 @@ namespace NActors {
}
return new TEventSerializedData;
}
-
- std::vector<std::vector<IEventFactory*>*> TEventFactories::EventFactories;
}
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 71c29082d8..5c2cc726ef 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -9,7 +9,6 @@
#include <util/system/hp_timer.h>
#include <util/generic/maybe.h>
-#include <util/string/builder.h>
namespace NActors {
class TChunkSerializer;
@@ -24,7 +23,7 @@ namespace NActors {
public ISerializerToStream {
protected:
// for compatibility with virtual actors
- virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandleFat> /*eventPtr*/) {
+ virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandle> /*eventPtr*/) {
Y_VERIFY_DEBUG(false);
return false;
}
@@ -34,7 +33,7 @@ namespace NActors {
virtual ~IEventBase() {
}
- bool Execute(IActor* actor, std::unique_ptr<IEventHandleFat> eventPtr) {
+ bool Execute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) {
return DoExecute(actor, std::move(eventPtr));
}
@@ -54,70 +53,8 @@ namespace NActors {
virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
};
- struct IEventHandleFields {
- enum EFlags : ui32 {
- FlagTrackDelivery = 1 << 0,
- FlagForwardOnNondelivery = 1 << 1,
- FlagSubscribeOnSession = 1 << 2,
- FlagUseSubChannel = 1 << 3,
- FlagGenerateUnsureUndelivered = 1 << 4,
- FlagExtendedFormat = 1 << 5,
- FlagLight = 1 << 18,
- FlagSerializable = 1 << 19,
- };
-
- static constexpr ui32 STICKY_FLAGS = (FlagLight | FlagSerializable);
-
- TActorId Recipient = {};
- TActorId Sender = {};
- ui32 Type = {};
-
-#pragma pack(push, 1)
- union {
- ui32 Flags = {};
- struct {
- ui32 NormalFlags:6;
- ui32 ReservedFlags:12;
- ui32 StickyFlags:2;
- ui32 Channel:12;
- };
- struct {
- // flags
- ui32 TrackDelivery:1;
- ui32 ForwardOnNondeliveryFlag:1;
- ui32 SubscribeOnSession:1;
- ui32 UseSubChannel:1;
- ui32 GenerateUnsureUndelivered:1;
- ui32 ExtendedFormat:1;
- // reserved
- ui32 Reserved:12;
- // sticky flags
- ui32 Light:1;
- ui32 Serializable:1;
- };
- };
-#pragma pack(pop)
-
- ui64 Cookie = {};
-
- TActorId RewriteRecipient = {};
- ui32 RewriteType = {};
-
- NWilson::TTraceId TraceId = {};
-
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- ::NHPTimer::STime SendTime = 0;
-#endif
- };
-
- class IEventHandle : public IEventHandleFields {
- public:
- IEventHandle(IEventHandleFields&& fields = {})
- : IEventHandleFields(std::move(fields))
- {}
-
- virtual ~IEventHandle() = default;
-
+ // fat handle
+ class IEventHandle : TNonCopyable {
struct TOnNondelivery {
TActorId Recipient;
@@ -127,143 +64,29 @@ namespace NActors {
}
};
- THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
-
- ui16 GetChannel() const noexcept {
- return Channel;
- }
-
- ui64 GetSubChannel() const noexcept {
- return UseSubChannel ? Sender.LocalId() : 0ULL;
- }
-
- // deprecate(xenoxeno) ?
- static const size_t ChannelBits = 12;
- static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
-
- static ui32 MakeFlags(ui32 channel, ui32 flags) {
- Y_VERIFY(channel < (1 << ChannelBits));
- Y_VERIFY(flags < (1 << ChannelShift));
- return (flags | (channel << ChannelShift));
- }
- //
-
- void SetFlags(ui32 flags) {
- Flags = (flags & ~STICKY_FLAGS) | (Flags & STICKY_FLAGS);
- }
-
- const TActorId& GetRecipientRewrite() const {
- return RewriteRecipient;
- }
-
- void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
- RewriteRecipient = recipientRewrite;
- RewriteType = typeRewrite;
- }
-
- void DropRewrite() {
- RewriteRecipient = Recipient;
- RewriteType = Type;
- }
-
- ui32 GetTypeRewrite() const {
- return RewriteType;
- }
-
- bool IsEventLight() const {
- return Light;
- }
-
- bool IsEventFat() const {
- return !Light;
- }
-
- bool IsEventSerializable() const {
- return Serializable;
- }
-
- template<typename TEventType>
- TEventType* Get();
- template<typename TEventType>
- TEventType* CastAsLocal();
- template<typename TEventType>
- TEventType* StaticCastAsLocal();
- bool HasEvent() const;
- bool HasBuffer() const;
- TString GetTypeName() const;
- TString ToString() const;
- size_t GetSize() const;
- static TAutoPtr<IEventHandle>& Forward(TAutoPtr<IEventHandle>& ev, TActorId recipient);
- static THolder<IEventHandle>& Forward(THolder<IEventHandle>& ev, TActorId recipient);
- static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure = false);
- static TAutoPtr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure = false);
- template<typename TEventType>
- static TEventType* Release(TAutoPtr<IEventHandle>&);
- template<typename TEventType>
- static TEventType* Release(THolder<IEventHandle>&);
- template<typename TEventType>
- static TEventType* Release(std::unique_ptr<IEventHandle>&);
-
- template<typename TEventTypeSmartPtr>
- static TAutoPtr<IEventHandle> ForwardOnNondelivery(TEventTypeSmartPtr& ev, ui32 reason, bool unsure = false) {
- TAutoPtr<IEventHandle> evi(ev.Release());
- return ForwardOnNondelivery(evi, reason, unsure);
- }
-
- TActorId GetForwardOnNondeliveryRecipient() const;
- };
-
- // fat handle
- class IEventHandleFat : public IEventHandle, TNonCopyable {
public:
template <typename TEv>
- inline TEv* CastAsLocal() const noexcept { // cast with event check
+ inline TEv* CastAsLocal() const noexcept {
auto fits = GetTypeRewrite() == TEv::EventType;
- constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());};
- if constexpr (couldBeCasted) {
- return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
- } else {
- Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data());
- }
- }
- template <typename TEv>
- inline TEv* StaticCastAsLocal() const noexcept { // blind cast
- constexpr bool couldBeCasted = requires() {static_cast<TEv*>(Event.Get());};
- if constexpr (couldBeCasted) {
- return static_cast<TEv*>(Event.Get());
- } else {
- Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEv>().data());
- }
+ return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
}
template <typename TEventType>
TEventType* Get() {
if (Type != TEventType::EventType)
- Y_FAIL("%s", (TStringBuilder()
- << "Event type " << Type
- << " doesn't match the expected type " << TEventType::EventType
- << " requested typename " << TypeName<TEventType>()
- << " actual typename " << GetTypeName()).data());
-
- constexpr bool couldBeGot = requires() {
- Event.Reset(TEventType::Load(Buffer.Get()));
- static_cast<TEventType*>(Event.Get());
- };
- if constexpr (couldBeGot) {
- if (!Event) {
- static TEventSerializedData empty;
- Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty));
- }
-
- if (Event) {
- return static_cast<TEventType*>(Event.Get());
- }
-
- Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
- } else {
- Y_FAIL("Event type %" PRIu32 " couldn't be get as type %s", Type, TypeName<TEventType>().data());
+ Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
+
+ if (!Event) {
+ static TEventSerializedData empty;
+ Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty));
+ }
+
+ if (Event) {
+ return static_cast<TEventType*>(Event.Get());
}
+
+ Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
}
template <typename T>
@@ -274,43 +97,104 @@ namespace NActors {
return x;
}
+ enum EFlags {
+ FlagTrackDelivery = 1 << 0,
+ FlagForwardOnNondelivery = 1 << 1,
+ FlagSubscribeOnSession = 1 << 2,
+ FlagUseSubChannel = 1 << 3,
+ FlagGenerateUnsureUndelivered = 1 << 4,
+ FlagExtendedFormat = 1 << 5,
+ };
+
+ const ui32 Type;
+ const ui32 Flags;
+ const TActorId Recipient;
+ TActorId Sender;
+ const ui64 Cookie;
const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect
+ // if set, used by ActorSystem/Interconnect to report tracepoints
+ NWilson::TTraceId TraceId;
+
// filled if feeded by interconnect session
const TActorId InterconnectSession;
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ ::NHPTimer::STime SendTime;
+#endif
+
+ static const size_t ChannelBits = 12;
+ static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
#ifdef USE_ACTOR_CALLSTACK
TCallstack Callstack;
#endif
+ ui16 GetChannel() const noexcept {
+ return Flags >> ChannelShift;
+ }
+
+ ui64 GetSubChannel() const noexcept {
+ return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL;
+ }
+
+ static ui32 MakeFlags(ui32 channel, ui32 flags) {
+ Y_VERIFY(channel < (1 << ChannelBits));
+ Y_VERIFY(flags < (1 << ChannelShift));
+ return (flags | (channel << ChannelShift));
+ }
+
private:
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
+ TActorId RewriteRecipient;
+ ui32 RewriteType;
+
+ THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
+
public:
+ void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
+ RewriteRecipient = recipientRewrite;
+ RewriteType = typeRewrite;
+ }
+
+ void DropRewrite() {
+ RewriteRecipient = Recipient;
+ RewriteType = Type;
+ }
+
+ const TActorId& GetRecipientRewrite() const {
+ return RewriteRecipient;
+ }
+
+ ui32 GetTypeRewrite() const {
+ return RewriteType;
+ }
+
TActorId GetForwardOnNondeliveryRecipient() const {
return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
}
- IEventHandleFat(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
+ IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
- : IEventHandle({
- .Recipient = recipient,
- .Sender = sender,
- .Type = ev->Type(),
- .Flags = flags,
- .Cookie = cookie,
- .RewriteRecipient = recipient,
- .RewriteType = ev->Type(),
- .TraceId = std::move(traceId),
- })
+ : Type(ev->Type())
+ , Flags(flags)
+ , Recipient(recipient)
+ , Sender(sender)
+ , Cookie(cookie)
+ , TraceId(std::move(traceId))
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ , SendTime(0)
+#endif
, Event(ev)
+ , RewriteRecipient(Recipient)
+ , RewriteType(Type)
{
if (forwardOnNondelivery)
OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
}
- IEventHandleFat(ui32 type,
+ IEventHandle(ui32 type,
ui32 flags,
const TActorId& recipient,
const TActorId& sender,
@@ -318,24 +202,25 @@ namespace NActors {
ui64 cookie,
const TActorId* forwardOnNondelivery = nullptr,
NWilson::TTraceId traceId = {})
- : IEventHandle({
- .Recipient = recipient,
- .Sender = sender,
- .Type = type,
- .Flags = flags,
- .Cookie = cookie,
- .RewriteRecipient = recipient,
- .RewriteType = type,
- .TraceId = std::move(traceId),
- })
+ : Type(type)
+ , Flags(flags)
+ , Recipient(recipient)
+ , Sender(sender)
+ , Cookie(cookie)
+ , TraceId(std::move(traceId))
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ , SendTime(0)
+#endif
, Buffer(std::move(buffer))
+ , RewriteRecipient(Recipient)
+ , RewriteType(Type)
{
if (forwardOnNondelivery)
OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
}
// Special ctor for events from interconnect.
- IEventHandleFat(const TActorId& session,
+ IEventHandle(const TActorId& session,
ui32 type,
ui32 flags,
const TActorId& recipient,
@@ -344,19 +229,20 @@ namespace NActors {
ui64 cookie,
TScopeId originScopeId,
NWilson::TTraceId traceId) noexcept
- : IEventHandle({
- .Recipient = recipient,
- .Sender = sender,
- .Type = type,
- .Flags = flags,
- .Cookie = cookie,
- .RewriteRecipient = recipient,
- .RewriteType = type,
- .TraceId = std::move(traceId),
- })
+ : Type(type)
+ , Flags(flags)
+ , Recipient(recipient)
+ , Sender(sender)
+ , Cookie(cookie)
, OriginScopeId(originScopeId)
+ , TraceId(std::move(traceId))
, InterconnectSession(session)
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ , SendTime(0)
+#endif
, Buffer(std::move(buffer))
+ , RewriteRecipient(Recipient)
+ , RewriteType(Type)
{
}
@@ -401,267 +287,63 @@ namespace NActors {
TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
if (Event)
- return new IEventHandleFat(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
+ return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
else
- return new IEventHandleFat(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
+ return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
}
- static TAutoPtr<IEventHandleFat> MakeFat(TAutoPtr<IEventHandle> ev) {
- if (ev->IsEventLight()) {
- Y_FAIL("Can't make light event fat");
- } else {
- TAutoPtr<IEventHandleFat> evb(static_cast<IEventHandleFat*>(ev.Release()));
- return evb;
- }
- }
+ TString GetTypeName() const;
+ TString ToString() const;
- static IEventHandleFat* GetFat(IEventHandle* ev) {
- if (ev->IsEventLight()) {
- Y_FAIL("Can't make light event fat");
- } else {
- return static_cast<IEventHandleFat*>(ev);
- }
- }
+ [[nodiscard]] static std::unique_ptr<IEventHandle> Forward(std::unique_ptr<IEventHandle>&& ev, TActorId recipient);
+ [[nodiscard]] static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>&& ev, ui32 reason, bool unsure = false);
- static const IEventHandleFat* GetFat(const IEventHandle* ev) {
- if (ev->IsEventLight()) {
- Y_FAIL("Can't make light event fat");
- } else {
- return static_cast<const IEventHandleFat*>(ev);
- }
+ [[nodiscard]] static TAutoPtr<IEventHandle> Forward(TAutoPtr<IEventHandle>&& ev, TActorId recipient) {
+ return Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release();
}
- static IEventHandleFat* GetFat(TAutoPtr<IEventHandle>& ev) {
- return GetFat(ev.Get());
+ [[nodiscard]] static THolder<IEventHandle> Forward(THolder<IEventHandle>&& ev, TActorId recipient) {
+ return THolder(Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release());
}
- static IEventHandleFat* GetFat(THolder<IEventHandle>& ev) {
- return GetFat(ev.Get());
+ [[nodiscard]] static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
+ return ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release();
}
- static IEventHandleFat* GetFat(std::unique_ptr<IEventHandle>& ev) {
- return GetFat(ev.get());
+ [[nodiscard]] static THolder<IEventHandle> ForwardOnNondelivery(THolder<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
+ return THolder(ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release());
}
- static TAutoPtr<IEventHandle> MakeBase(TAutoPtr<IEventHandleFat> ev) {
- return ev.Release();
+ template<typename T>
+ static TAutoPtr<T> Release(TAutoPtr<IEventHandle>& ev) {
+ return ev->Release<T>();
}
- static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure = false);
- };
-
- template <typename TEventType>
- class TEventHandleFat: public IEventHandleFat {
- TEventHandleFat(); // we never made instance of TEventHandleFat
- public:
- TEventType* Get() {
- return IEventHandleFat::Get<TEventType>();
+ template<typename T>
+ static TAutoPtr<T> Release(THolder<IEventHandle>& ev) {
+ return ev->Release<T>();
}
- TAutoPtr<TEventType> Release() {
- return IEventHandleFat::Release<TEventType>();
+ template <typename TEv>
+ inline TEv* StaticCastAsLocal() const noexcept { // blind cast
+ return static_cast<TEv*>(Event.Get());
}
};
- static_assert(sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat), "expect sizeof(TEventHandleFat<IEventBase>) == sizeof(IEventHandleFat)");
-
- // light handle
- class IEventHandleLight : public IEventHandle {
+ template <typename TEventType>
+ class TEventHandle: public IEventHandle {
+ TEventHandle(); // we never made instance of TEventHandle
public:
- IEventHandleLight(ui32 type) {
- RewriteType = Type = type;
- Light = true;
- }
-
- IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender) {
- RewriteRecipient = Recipient = recipient;
- Sender = sender;
- return this;
- }
-
- IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags) {
- RewriteRecipient = Recipient = recipient;
- Sender = sender;
- SetFlags(flags);
- return this;
- }
-
- IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie) {
- RewriteRecipient = Recipient = recipient;
- Sender = sender;
- SetFlags(flags);
- Cookie = cookie;
- return this;
- }
-
- IEventHandleLight* PrepareSend(TActorId recipient, TActorId sender, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) {
- RewriteRecipient = Recipient = recipient;
- Sender = sender;
- SetFlags(flags);
- Cookie = cookie;
- TraceId = std::move(traceId);
- return this;
- }
-
- void Forward(const TActorId& dest) {
- RewriteRecipient = dest;
- }
-
- static IEventHandleLight* GetLight(IEventHandle* ev) {
- if (ev->IsEventLight()) {
- return static_cast<IEventHandleLight*>(ev);
- } else {
- Y_FAIL("Can't make fat event light");
- }
- }
-
- static IEventHandleLight* GetLight(TAutoPtr<IEventHandle>& ev) {
- return GetLight(ev.Get());
- }
-
- static IEventHandleLight* GetLight(THolder<IEventHandle>& ev) {
- return GetLight(ev.Get());
- }
-
- static IEventHandleLight* GetLight(std::unique_ptr<IEventHandle>& ev) {
- return GetLight(ev.get());
- }
-
- static IEventHandleLight* ReleaseLight(TAutoPtr<IEventHandle>& ev) {
- return GetLight(ev.Release());
- }
-
- static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure = false);
-
- template<typename TEventType>
TEventType* Get() {
- if (Type != TEventType::EventType) {
- Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
- }
- constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);};
- if constexpr (couldBeConverted) {
- return static_cast<TEventType*>(this);
- } else {
- Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data());
- }
- }
-
- template<typename TEventType>
- TEventType* CastAsLocal() {
- constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);};
- if constexpr (couldBeConverted) {
- if (Type == TEventType::EventType) {
- return static_cast<TEventType*>(this);
- }
- }
- return nullptr;
+ return IEventHandle::Get<TEventType>();
}
- template<typename TEventType>
- TEventType* StaticCastAsLocal() {
- constexpr bool couldBeConverted = requires() {static_cast<TEventType*>(this);};
- if constexpr (couldBeConverted) {
- return static_cast<TEventType*>(this);
- }
- Y_FAIL("Event type %" PRIu32 " couldn't be converted to type %s", Type, TypeName<TEventType>().data());
+ TAutoPtr<TEventType> Release() {
+ return IEventHandle::Release<TEventType>();
}
};
- template<typename TEventType>
- TEventType* IEventHandle::Get() {
- if (IsEventLight()) {
- return IEventHandleLight::GetLight(this)->Get<TEventType>();
- } else {
- return IEventHandleFat::GetFat(this)->Get<TEventType>();
- }
- }
-
- template<typename TEventType>
- TEventType* IEventHandle::CastAsLocal() {
- if (IsEventLight()) {
- return IEventHandleLight::GetLight(this)->CastAsLocal<TEventType>();
- } else {
- return IEventHandleFat::GetFat(this)->CastAsLocal<TEventType>();
- }
- }
-
- template<typename TEventType>
- TEventType* IEventHandle::StaticCastAsLocal() {
- if (IsEventLight()) {
- return IEventHandleLight::GetLight(this)->StaticCastAsLocal<TEventType>();
- } else {
- return IEventHandleFat::GetFat(this)->StaticCastAsLocal<TEventType>();
- }
- }
-
- template<typename TEventType>
- TEventType* IEventHandle::Release(TAutoPtr<IEventHandle>& ev) {
- if (ev->IsEventLight()) {
- return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>();
- } else {
- return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release();
- }
- }
-
- template<typename TEventType>
- TEventType* IEventHandle::Release(THolder<IEventHandle>& ev) {
- if (ev->IsEventLight()) {
- return IEventHandleLight::GetLight(ev.Release())->CastAsLocal<TEventType>();
- } else {
- return IEventHandleFat::GetFat(ev.Get())->Release<TEventType>().Release();
- }
- }
-
- template<typename TEventType>
- TEventType* IEventHandle::Release(std::unique_ptr<IEventHandle>& ev) {
- if (ev->IsEventLight()) {
- return IEventHandleLight::GetLight(ev.release())->CastAsLocal<TEventType>();
- } else {
- return IEventHandleFat::GetFat(ev.get())->Release<TEventType>().Release();
- }
- }
-
-
- class IEventHandleLightSerializable;
-
- using TEventSerializer = std::function<bool(const IEventHandleLightSerializable*, TChunkSerializer*)>;
-
- class IEventHandleLightSerializable : public IEventHandleLight {
- public:
- TEventSerializer Serializer;
-
- IEventHandleLightSerializable(ui32 type, TEventSerializer serializer)
- : IEventHandleLight(type)
- , Serializer(std::move(serializer))
- {
- Serializable = true;
- }
-
- static IEventHandleLightSerializable* GetLightSerializable(IEventHandle* ev) {
- if (ev->IsEventSerializable()) {
- return static_cast<IEventHandleLightSerializable*>(ev);
- } else {
- Y_FAIL("Can't make serializable event");
- }
- }
-
- static const IEventHandleLightSerializable* GetLightSerializable(const IEventHandle* ev) {
- if (ev->IsEventSerializable()) {
- return static_cast<const IEventHandleLightSerializable*>(ev);
- } else {
- Y_FAIL("Can't make serializable event");
- }
- }
-
- static IEventHandleLightSerializable* GetLightSerializable(TAutoPtr<IEventHandle>& ev) {
- return GetLightSerializable(ev.Get());
- }
-
- size_t GetSize() const {
- // TODO(xenoxeno)
- return 0;
- }
- };
+ static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)");
template <typename TEventType, ui32 EventType0>
class TEventBase: public IEventBase {
@@ -672,7 +354,7 @@ namespace NActors {
}
// still abstract
- typedef TEventHandleFat<TEventType> THandle;
+ typedef TEventHandle<TEventType> THandle;
typedef TAutoPtr<THandle> TPtr;
};
@@ -681,7 +363,7 @@ namespace NActors {
return TString(header); \
} \
bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
- Y_FAIL("Local event %s is not serializable", TypeName(*this).data()); \
+ Y_FAIL("Local event " #eventType " is not serializable"); \
} \
static IEventBase* Load(NActors::TEventSerializedData*) { \
Y_FAIL("Local event " #eventType " has no load method"); \
@@ -703,39 +385,4 @@ namespace NActors {
bool IsSerializable() const override { \
return true; \
}
-
-
- struct TEventMeta {
- TActorId Session;
- ui32 Type;
- ui32 Flags;
- TActorId Recipient;
- TActorId Sender;
- ui64 Cookie;
- TScopeId OriginScopeId;
- NWilson::TTraceId TraceId;
- TRope Data;
-
- bool IsExtendedFormat() const {
- return Flags & IEventHandle::FlagExtendedFormat;
- }
- };
-
- inline constexpr ui32 GetEventSpace(ui32 eventType) {
- return (eventType >> 16u);
- }
-
- inline constexpr ui32 GetEventSubType(ui32 eventType) {
- return (eventType & (0xffff));
- }
-
- struct IEventFactory {
- virtual IEventHandle* Construct(const TEventMeta& eventMeta) = 0;
- virtual void Destruct(IEventHandle*) = 0;
- };
-
- struct TEventFactories {
- // it supposed to be statically allocated in the begining
- static std::vector<std::vector<IEventFactory*>*> EventFactories; // [EventSpace][EventSubType]
- };
}
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index b845760221..30cc26aa46 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -7,7 +7,7 @@
#include <library/cpp/actors/wilson/wilson_trace.h>
namespace NActors {
- class IEventHandleFat;
+ class IEventHandle;
struct TConstIoVec {
const void* Data;
diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h
index c5fff6af35..2845aa94dd 100644
--- a/library/cpp/actors/core/event_local.h
+++ b/library/cpp/actors/core/event_local.h
@@ -71,21 +71,4 @@ namespace NActors {
return new TEv();
}
};
-
- template<typename TEv, ui32 EventType0>
- class TEventLight : public IEventHandleLight {
- public:
- static constexpr ui32 EventType = EventType0;
-
- TEventLight()
- : IEventHandleLight(EventType)
- {}
-
- TEv* Get() {
- return static_cast<TEv*>(this);
- }
-
- using THandle = TEv;
- using TPtr = TAutoPtr<TEv>;
- };
}
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index d56a9b91de..8fd43aa34d 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -513,253 +513,4 @@ namespace NActors {
dest->SetRawX1(src.RawX1());
dest->SetRawX2(src.RawX2());
}
-
- template<ui32 EventSpace>
- struct TLightEventSpaceFactories {
- public:
- static std::vector<IEventFactory*>& GetEventSpaceFactories(ui32 eventSpace) {
- if (TEventFactories::EventFactories.size() <= eventSpace) {
- // it's only safe to do BEFORE initialization of ActorSystem
- TEventFactories::EventFactories.resize(eventSpace + 1);
- }
- if (TEventFactories::EventFactories[eventSpace] == nullptr) {
- TEventFactories::EventFactories[eventSpace] = new std::vector<IEventFactory*>();
- }
- return *TEventFactories::EventFactories[eventSpace];
- }
-
- static void SetEventFactory(ui32 eventType, IEventFactory* factory) {
- Y_VERIFY(GetEventSpace(eventType) == EventSpace);
- auto eventSpaceFactories = GetEventSpaceFactories(GetEventSpace(eventType));
- if (eventSpaceFactories.size() <= GetEventSubType(eventType)) {
- // it's only safe to do BEFORE initialization of ActorSystem
- eventSpaceFactories.resize(GetEventSubType(eventType) + 1);
- }
- eventSpaceFactories[GetEventSubType(eventType)] = factory;
- }
- };
-
- template<typename TEvent>
- class TLightEventFactory : public IEventFactory, TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)> {
- private:
- mutable size_t CachedByteSize = 0;
-
- static constexpr char PayloadMarker = 0x07;
- static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
-
- static size_t SerializeNumber(size_t num, char *buffer) {
- char *begin = buffer;
- do {
- *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
- num >>= 7;
- } while (num);
- return buffer - begin;
- }
-
- static size_t DeserializeNumber(const char **ptr, const char *end) {
- const char *p = *ptr;
- size_t res = 0;
- size_t offset = 0;
- for (;;) {
- if (p == end) {
- return Max<size_t>();
- }
- const char byte = *p++;
- res |= (static_cast<size_t>(byte) & 0x7F) << offset;
- offset += 7;
- if (!(byte & 0x80)) {
- break;
- }
- }
- *ptr = p;
- return res;
- }
-
- static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) {
- size_t res = 0;
- size_t offset = 0;
- for (;;) {
- if (!iter.Valid()) {
- return Max<size_t>();
- }
- const char byte = *iter.ContiguousData();
- iter += 1;
- --size;
- res |= (static_cast<size_t>(byte) & 0x7F) << offset;
- offset += 7;
- if (!(byte & 0x80)) {
- break;
- }
- }
- return res;
- }
-
- public:
- void DeserializeProtoEvent(TEvent* event, const TEventMeta& eventMeta) {
- TRope::TConstIterator iter = eventMeta.Data.Begin();
- ui64 size = eventMeta.Data.GetSize();
- if (eventMeta.IsExtendedFormat()) {
- constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;};
- if constexpr (hasPayload) {
- // check marker
- if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
- Y_FAIL("Invalid event marker");
- }
- // skip marker
- iter += 1;
- --size;
- // parse number of payload ropes
- size_t numRopes = DeserializeNumber(iter, size);
- if (numRopes == Max<size_t>()) {
- Y_FAIL("Invalid event rope number");
- }
- while (numRopes--) {
- // parse length of the rope
- const size_t len = DeserializeNumber(iter, size);
- if (len == Max<size_t>() || size < len) {
- Y_FAIL("%s", (TStringBuilder() << "Invalid event len# " << len << " size# " << size).data());
- }
- // extract the rope
- TRope::TConstIterator begin = iter;
- iter += len;
- size -= len;
- event->Payload.emplace_back(begin, iter);
- }
- } else {
- Y_FAIL("%s", (TStringBuilder() << "Extended format is not supported for event " << TypeName<TEvent>()).data());
- }
- }
- // parse the protobuf
- TRopeStream stream(iter, size);
- if (!event->Record.ParsePartialFromZeroCopyStream(&stream)) {
- Y_FAIL("%s", (TStringBuilder() << "Failed to parse protobuf event type " << eventMeta.Type << " class " << TypeName<TEvent>()).data());
- }
- }
-
- static bool SerializeProtoEvent(const TEvent* event, TChunkSerializer* chunker) {
- constexpr bool hasPayload = requires(const TEvent* e) {e->Payload;};
- if constexpr (hasPayload) {
- // serialize payload first
- if (event->Payload) {
- void *data;
- int size = 0;
- auto append = [&](const char *p, size_t len) {
- while (len) {
- if (size) {
- const size_t numBytesToCopy = std::min<size_t>(size, len);
- memcpy(data, p, numBytesToCopy);
- data = static_cast<char*>(data) + numBytesToCopy;
- size -= numBytesToCopy;
- p += numBytesToCopy;
- len -= numBytesToCopy;
- } else if (!chunker->Next(&data, &size)) {
- return false;
- }
- }
- return true;
- };
- auto appendNumber = [&](size_t number) {
- char buf[MaxNumberBytes];
- return append(buf, SerializeNumber(number, buf));
- };
- char marker = PayloadMarker;
- append(&marker, 1);
- if (!appendNumber(event->Payload.size())) {
- return false;
- }
- for (const TRope& rope : event->Payload) {
- if (!appendNumber(rope.GetSize())) {
- return false;
- }
- if (rope) {
- if (size) {
- chunker->BackUp(std::exchange(size, 0));
- }
- if (!chunker->WriteRope(&rope)) {
- return false;
- }
- }
- }
- if (size) {
- chunker->BackUp(size);
- }
- }
- }
- return event->Record.SerializeToZeroCopyStream(chunker);
- }
-
- public:
- TEvent* New() {
- return new TEvent();
- }
-
- void Delete(TEvent* event) {
- delete event;
- }
-
- virtual IEventHandle* Construct(const TEventMeta& eventMeta) override {
- IEventHandle* ev = nullptr;
- if (eventMeta.Type == TEvent::EventType) {
- TEvent* event = New();
- DeserializeProtoEvent(event, eventMeta);
- }
- return ev;
- }
-
- virtual void Destruct(IEventHandle* ev) override {
- if (ev->Type != TEvent::EventType) {
- Y_FAIL("Wrong event supplied");
- }
- Delete(static_cast<TEvent*>(ev));
- }
-
- TLightEventFactory() {
- Cerr << "TLightEventFactory<" << TypeName<TEvent>() << ">()" << Endl;
- TLightEventSpaceFactories<GetEventSpace(TEvent::EventType)>::SetEventFactory(TEvent::EventType, this);
- }
- };
-
- template<typename TEv>
- struct TLightEventFactoryInitializer {
- static TLightEventFactory<TEv> EventFactory;
-
- static TEv* New() {
- return EventFactory.New();
- }
-
- static void Delete(TEv* event) {
- EventFactory.Delete(event);
- }
- };
-
- template<typename TEv>
- TLightEventFactory<TEv> TLightEventFactoryInitializer<TEv>::EventFactory;
-
- template<typename TEv, typename TRecord, ui32 EventType0>
- class TEventLightPB : public IEventHandleLightSerializable, public TLightEventFactoryInitializer<TEv> {
- public:
- static constexpr ui32 EventType = EventType0;
-
- TRecord Record;
-
- static bool SerializeProto(const IEventHandleLightSerializable* event, TChunkSerializer* chunker) {
- return TLightEventFactory<TEv>::SerializeProtoEvent(static_cast<const TEv*>(event), chunker);
- }
-
- TEventLightPB()
- : IEventHandleLightSerializable(EventType, &TEventLightPB<TEv, TRecord, EventType0>::SerializeProto)
- {}
-
- TEv* Get() {
- return static_cast<TEv*>(this);
- }
-
- using TPtr = TAutoPtr<TEv>;
- };
-
- template<typename TEv, typename TRecord, ui32 EventType0>
- class TEventLightPBWithPayload : public TEventLightPB<TEv, TRecord, EventType0> {
- public:
- std::vector<TRope> Payload;
- };
}
diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp
index ea5fd10124..70b9ea2d71 100644
--- a/library/cpp/actors/core/events_undelivered.cpp
+++ b/library/cpp/actors/core/events_undelivered.cpp
@@ -38,68 +38,20 @@ namespace NActors {
return new TEvUndelivered(sourceType, reason);
}
- TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(TAutoPtr<IEventHandle>& ev, ui32 reason, bool unsure) {
- std::unique_ptr<IEventHandle> tev(ev.Release());
- TAutoPtr<IEventHandle> fw = ForwardOnNondelivery(tev, reason, unsure);
- if (tev) {
- // we don't want to delete original event handle here
- ev = tev.release();
- }
- return fw;
- }
-
- TAutoPtr<IEventHandle> IEventHandle::ForwardOnNondelivery(std::unique_ptr<IEventHandle>& ev, ui32 reason, bool unsure) {
- if (ev->IsEventFat()) {
- std::unique_ptr<IEventHandleFat> evf(IEventHandleFat::GetFat(ev.release()));
- std::unique_ptr<IEventHandle> fw = IEventHandleFat::ForwardOnNondelivery(evf, reason, unsure);
- if (evf) {
- ev = std::unique_ptr<IEventHandle>(evf.release());
- }
- return fw.release();
- }
- if (ev->IsEventLight()) {
- std::unique_ptr<IEventHandleLight> evl(IEventHandleLight::GetLight(ev.release()));
- std::unique_ptr<IEventHandle> fw = IEventHandleLight::ForwardOnNondelivery(evl, reason, unsure);
- if (evl) {
- ev = std::unique_ptr<IEventHandle>(evl.release());
- }
- return fw.release();
- }
- return {};
- }
-
- std::unique_ptr<IEventHandle> IEventHandleFat::ForwardOnNondelivery(std::unique_ptr<IEventHandleFat>& ev, ui32 reason, bool unsure) {
- if (ev->ForwardOnNondeliveryFlag) {
+ std::unique_ptr<IEventHandle> IEventHandle::ForwardOnNondelivery(std::unique_ptr<IEventHandle>&& ev, ui32 reason, bool unsure) {
+ if (ev->Flags & FlagForwardOnNondelivery) {
const ui32 updatedFlags = ev->Flags & ~(FlagForwardOnNondelivery | FlagSubscribeOnSession);
const TActorId recp = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId();
if (ev->Event)
- return std::unique_ptr<IEventHandle>(new IEventHandleFat(recp, ev->Sender, ev->Event.Release(), updatedFlags, ev->Cookie, &ev->Recipient, std::move(ev->TraceId)));
+ return std::unique_ptr<IEventHandle>(new IEventHandle(recp, ev->Sender, ev->Event.Release(), updatedFlags, ev->Cookie, &ev->Recipient, std::move(ev->TraceId)));
else
- return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Type, updatedFlags, recp, ev->Sender, ev->Buffer, ev->Cookie, &ev->Recipient, std::move(ev->TraceId)));
- }
-
- if (ev->TrackDelivery) {
- const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered);
- return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags,
- ev->Cookie, nullptr, std::move(ev->TraceId)));
- }
- return {};
- }
-
- std::unique_ptr<IEventHandle> IEventHandleLight::ForwardOnNondelivery(std::unique_ptr<IEventHandleLight>& ev, ui32 reason, bool unsure) {
- if (ev->ForwardOnNondeliveryFlag) {
- ev->ForwardOnNondeliveryFlag = false;
- ev->SubscribeOnSession = false;
- auto recpt = ev->Recipient;
- ev->Recipient = ev->OnNondeliveryHolder ? ev->OnNondeliveryHolder->Recipient : TActorId();
- ev->OnNondeliveryHolder = MakeHolder<TOnNondelivery>(recpt);
- return std::unique_ptr<IEventHandle>(ev.release());
+ return std::unique_ptr<IEventHandle>(new IEventHandle(ev->Type, updatedFlags, recp, ev->Sender, ev->Buffer, ev->Cookie, &ev->Recipient, std::move(ev->TraceId)));
}
- if (ev->TrackDelivery) {
+ if (ev->Flags & FlagTrackDelivery) {
const ui32 updatedFlags = ev->Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered);
- return std::unique_ptr<IEventHandle>(new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags,
+ return std::unique_ptr<IEventHandle>(new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->Type, reason, unsure), updatedFlags,
ev->Cookie, nullptr, std::move(ev->TraceId)));
}
return {};
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index c8267fa715..611899c6ef 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -176,7 +176,7 @@ namespace NActors {
NProfiling::TMemoryTagScope::Reset(ActorSystem->MemProfActivityBase + activityType);
}
- actor->Receive(ev);
+ actor->Receive(ev, ctx);
size_t dyingActorsCnt = DyingActors.size();
Ctx.UpdateActorsStats(dyingActorsCnt);
@@ -202,7 +202,7 @@ namespace NActors {
} else {
actorType = nullptr;
- TAutoPtr<IEventHandle> nonDelivered = IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown);
+ TAutoPtr<IEventHandle> nonDelivered = IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::ReasonActorUnknown);
if (nonDelivered.Get()) {
ActorSystem->Send(nonDelivered);
} else {
diff --git a/library/cpp/actors/core/io_dispatcher.cpp b/library/cpp/actors/core/io_dispatcher.cpp
index cccb2336ff..6bd753f2e0 100644
--- a/library/cpp/actors/core/io_dispatcher.cpp
+++ b/library/cpp/actors/core/io_dispatcher.cpp
@@ -116,7 +116,7 @@ namespace NActors {
bool sendNotify;
if (!Actor.TaskQueue.Dequeue(tasks, &sendNotify)) {
if (sendNotify) {
- ActorSystem->Send(new IEventHandleFat(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(),
+ ActorSystem->Send(new IEventHandle(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(),
nullptr, TThread::CurrentThreadId()));
}
break;
diff --git a/library/cpp/actors/core/io_dispatcher.h b/library/cpp/actors/core/io_dispatcher.h
index a33cb0d98e..b0e4e60d1a 100644
--- a/library/cpp/actors/core/io_dispatcher.h
+++ b/library/cpp/actors/core/io_dispatcher.h
@@ -28,7 +28,7 @@ namespace NActors {
*/
template<typename TCallback>
static void InvokeIoCallback(TCallback&& callback, ui32 poolId, IActor::EActivityType activityType) {
- if (!TActivationContext::Send(new IEventHandleFat(MakeIoDispatcherActorId(), TActorId(),
+ if (!TActivationContext::Send(new IEventHandle(MakeIoDispatcherActorId(), TActorId(),
new TEvInvokeQuery(callback)))) {
TActivationContext::Register(CreateExecuteLaterActor(std::move(callback), activityType), TActorId(),
TMailboxType::HTSwap, poolId);
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index c7bed09f1b..7c569a2dad 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -316,7 +316,7 @@ namespace NActors {
{
const NLog::TSettings *mSettings = ctx.LoggerSettings();
TLoggerActor::Throttle(*mSettings);
- ctx.Send(new IEventHandleFat(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
+ ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
}
template <typename TCtx, typename... TArgs>
diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp
index 572af788ec..995e3c4121 100644
--- a/library/cpp/actors/core/log_ut.cpp
+++ b/library/cpp/actors/core/log_ut.cpp
@@ -105,19 +105,19 @@ namespace {
{}
void WriteLog() {
- Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")});
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")});
}
void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg, TString msg = "foo") {
- Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)});
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)});
}
void FlushLogBuffer() {
- Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TFlushLogBuffer()});
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TFlushLogBuffer()});
}
void Wakeup() {
- Runtime.Send(new IEventHandleFat{LoggerActor, {}, new TEvents::TEvWakeup});
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvents::TEvWakeup});
}
TIntrusivePtr<TDynamicCounters> Counters{MakeIntrusive<TDynamicCounters>()};
diff --git a/library/cpp/actors/dnsresolver/dnsresolver.cpp b/library/cpp/actors/dnsresolver/dnsresolver.cpp
index 396194b266..71e7f4d037 100644
--- a/library/cpp/actors/dnsresolver/dnsresolver.cpp
+++ b/library/cpp/actors/dnsresolver/dnsresolver.cpp
@@ -328,7 +328,7 @@ namespace NDnsResolver {
}
result->Status = status;
- reqCtx->ActorSystem->Send(new IEventHandleFat(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie));
+ reqCtx->ActorSystem->Send(new IEventHandle(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie));
break;
}
@@ -356,7 +356,7 @@ namespace NDnsResolver {
}
result->Status = status;
- reqCtx->ActorSystem->Send(new IEventHandleFat(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie));
+ reqCtx->ActorSystem->Send(new IEventHandle(reqCtx->Sender, reqCtx->SelfId, result.Release(), 0, reqCtx->Cookie));
break;
}
}
diff --git a/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp b/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp
index 5f9fd3c444..89a7e9ab36 100644
--- a/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp
+++ b/library/cpp/actors/dnsresolver/dnsresolver_caching_ut.cpp
@@ -256,7 +256,7 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) {
}
void Sleep(TDuration duration) {
- Schedule(new IEventHandleFat(Sleeper, Sleeper, new TEvents::TEvWakeup), duration);
+ Schedule(new IEventHandle(Sleeper, Sleeper, new TEvents::TEvWakeup), duration);
GrabEdgeEventRethrow<TEvents::TEvWakeup>(Sleeper);
}
@@ -272,11 +272,11 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) {
}
void SendGetHostByName(const TActorId& sender, const TString& name, int family = AF_UNSPEC) {
- Send(new IEventHandleFat(Resolver, sender, new TEvDns::TEvGetHostByName(name, family)), 0, true);
+ Send(new IEventHandle(Resolver, sender, new TEvDns::TEvGetHostByName(name, family)), 0, true);
}
void SendGetAddr(const TActorId& sender, const TString& name, int family = AF_UNSPEC) {
- Send(new IEventHandleFat(Resolver, sender, new TEvDns::TEvGetAddr(name, family)), 0, true);
+ Send(new IEventHandle(Resolver, sender, new TEvDns::TEvGetAddr(name, family)), 0, true);
}
TEvDns::TEvGetHostByNameResult::TPtr WaitGetHostByName(const TActorId& sender) {
@@ -387,7 +387,7 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) {
runtime.SendGetAddr(sender, "yandex.ru", AF_UNSPEC);
runtime.ExpectGetAddrSuccess(sender, "2a02:6b8:a::a");
- runtime.Send(new IEventHandleFat(runtime.MockResolver, { }, new TEvents::TEvPoison), 0, true);
+ runtime.Send(new IEventHandle(runtime.MockResolver, { }, new TEvents::TEvPoison), 0, true);
runtime.SendGetAddr(sender, "foo.ru", AF_UNSPEC);
runtime.ExpectGetAddrError(sender, ARES_ENOTINITIALIZED);
}
@@ -640,7 +640,7 @@ Y_UNIT_TEST_SUITE(CachingDnsResolver) {
runtime.SendGetHostByName(sender, "yandex.ru", AF_UNSPEC);
runtime.SendGetAddr(sender, "yandex.ru", AF_UNSPEC);
- runtime.Send(new IEventHandleFat(runtime.Resolver, sender, new TEvents::TEvPoison), 0, true);
+ runtime.Send(new IEventHandle(runtime.Resolver, sender, new TEvents::TEvPoison), 0, true);
runtime.ExpectGetHostByNameError(sender, ARES_ECANCELLED);
runtime.ExpectGetAddrError(sender, ARES_ECANCELLED);
}
diff --git a/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp b/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp
index d15fd54192..2758484552 100644
--- a/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp
+++ b/library/cpp/actors/dnsresolver/dnsresolver_ondemand_ut.cpp
@@ -13,7 +13,7 @@ Y_UNIT_TEST_SUITE(OnDemandDnsResolver) {
runtime.Initialize();
auto sender = runtime.AllocateEdgeActor();
auto resolver = runtime.Register(CreateOnDemandDnsResolver());
- runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)),
+ runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)),
0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender);
UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText);
diff --git a/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp b/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp
index 089bd5179f..0c343a805c 100644
--- a/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp
+++ b/library/cpp/actors/dnsresolver/dnsresolver_ut.cpp
@@ -28,7 +28,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) {
runtime.Initialize();
auto sender = runtime.AllocateEdgeActor();
auto resolver = runtime.Register(CreateSimpleDnsResolver());
- runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)),
+ runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("localhost", AF_UNSPEC)),
0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender);
UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText);
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) {
runtime.Initialize();
auto sender = runtime.AllocateEdgeActor();
auto resolver = runtime.Register(CreateSimpleDnsResolver());
- runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("yandex.ru", AF_UNSPEC)),
+ runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("yandex.ru", AF_UNSPEC)),
0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender);
UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText);
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) {
auto sender = runtime.AllocateEdgeActor();
auto resolver = runtime.Register(CreateSimpleDnsResolver());
- runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetAddr("yandex.ru", AF_UNSPEC)),
+ runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetAddr("yandex.ru", AF_UNSPEC)),
0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetAddrResult>(sender);
UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, 0, ev->Get()->ErrorText);
@@ -72,7 +72,7 @@ Y_UNIT_TEST_SUITE(DnsResolver) {
options.Attempts = 2;
options.Servers.emplace_back(TStringBuilder() << "127.0.0.1:" << server.Port);
auto resolver = runtime.Register(CreateSimpleDnsResolver(options));
- runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)),
+ runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)),
0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender);
UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, ARES_ETIMEOUT, ev->Get()->ErrorText);
@@ -88,9 +88,9 @@ Y_UNIT_TEST_SUITE(DnsResolver) {
options.Attempts = 5;
options.Servers.emplace_back(TStringBuilder() << "127.0.0.1:" << server.Port);
auto resolver = runtime.Register(CreateSimpleDnsResolver(options));
- runtime.Send(new IEventHandleFat(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)),
+ runtime.Send(new IEventHandle(resolver, sender, new TEvDns::TEvGetHostByName("timeout.yandex.ru", AF_INET)),
0, true);
- runtime.Send(new IEventHandleFat(resolver, sender, new TEvents::TEvPoison), 0, true);
+ runtime.Send(new IEventHandle(resolver, sender, new TEvents::TEvPoison), 0, true);
auto ev = runtime.GrabEdgeEventRethrow<TEvDns::TEvGetHostByNameResult>(sender);
UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, ARES_ECANCELLED, ev->Get()->ErrorText);
}
diff --git a/library/cpp/actors/examples/02_discovery/lookup.cpp b/library/cpp/actors/examples/02_discovery/lookup.cpp
index 469fba74fa..979a38d215 100644
--- a/library/cpp/actors/examples/02_discovery/lookup.cpp
+++ b/library/cpp/actors/examples/02_discovery/lookup.cpp
@@ -14,7 +14,7 @@ class TExampleLookupRequestActor : public TActor<TExampleLookupRequestActor> {
void Registered(TActorSystem* sys, const TActorId&) override {
const auto flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession;
- sys->Send(new IEventHandleFat(Replica, SelfId(), new TEvExample::TEvReplicaLookup(Key), flags));
+ sys->Send(new IEventHandle(Replica, SelfId(), new TEvExample::TEvReplicaLookup(Key), flags));
}
void PassAway() override {
diff --git a/library/cpp/actors/helpers/flow_controlled_queue.cpp b/library/cpp/actors/helpers/flow_controlled_queue.cpp
index 4f67f85a3b..104d239481 100644
--- a/library/cpp/actors/helpers/flow_controlled_queue.cpp
+++ b/library/cpp/actors/helpers/flow_controlled_queue.cpp
@@ -102,7 +102,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
Subscribed = true;
}
- TActivationContext::Send(new IEventHandleFat(Target, reqActorId, IEventHandleFat::GetFat(ev.Get())->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
+ TActivationContext::Send(new IEventHandle(Target, reqActorId, ev.Get()->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
}
void PumpQueue() {
@@ -123,7 +123,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
if (reqActor) {
if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
TActivationContext::Send(
- new IEventHandleFat(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
+ new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
);
}
reqActor->PassAway();
@@ -136,7 +136,7 @@ class TFlowControlledRequestQueue : public IActorCallback {
const auto reason = TEvents::TEvUndelivered::Disconnected;
if (ev->Flags & IEventHandle::FlagTrackDelivery) {
TActivationContext::Send(
- new IEventHandleFat(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
+ new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
);
}
}
@@ -175,8 +175,7 @@ public:
auto it = Find(RegisteredRequests, reqActor);
if (it == RegisteredRequests.end())
return;
- IEventHandle::Forward(ev, reqActor->Source);
- TActivationContext::Send(ev);
+ TActivationContext::Send(ev->Forward(reqActor->Source).Release());
const TDuration reqLatency = reqActor->AccumulatedLatency();
if (reqLatency < MinimalSeenLatency)
MinimalSeenLatency = reqLatency;
diff --git a/library/cpp/actors/helpers/future_callback.h b/library/cpp/actors/helpers/future_callback.h
index 4db11c7313..8ca0d99fda 100644
--- a/library/cpp/actors/helpers/future_callback.h
+++ b/library/cpp/actors/helpers/future_callback.h
@@ -7,7 +7,7 @@ namespace NActors {
template <typename EventType>
struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> {
- using TCallback = std::function<void(TAutoPtr<TEventHandleFat<EventType>>&)>;
+ using TCallback = std::function<void(TAutoPtr<TEventHandle<EventType>>&)>;
using TBase = TActor<TActorFutureCallback<EventType>>;
TCallback Callback;
@@ -30,29 +30,4 @@ struct TActorFutureCallback : TActor<TActorFutureCallback<EventType>> {
}
};
-template <typename EventType>
-struct TActorFutureCallbackLight : TActor<TActorFutureCallbackLight<EventType>> {
- using TCallback = std::function<void(TAutoPtr<EventType>&)>;
- using TBase = TActor<TActorFutureCallbackLight<EventType>>;
- TCallback Callback;
-
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::ACTOR_FUTURE_CALLBACK;
- }
-
- TActorFutureCallbackLight(TCallback&& callback)
- : TBase(&TActorFutureCallbackLight::StateWaitForEvent)
- , Callback(std::move(callback))
- {}
-
- STRICT_LIGHTFN(StateWaitForEvent,
- hFunc(EventType, Handle)
- )
-
- void Handle(typename EventType::TPtr ev) {
- Callback(ev);
- TBase::PassAway();
- }
-};
-
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp
index ed4c0972fb..542f817755 100644
--- a/library/cpp/actors/helpers/selfping_actor_ut.cpp
+++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp
@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {
const TActorId actorId = runtime->Register(actor);
Y_UNUSED(actorId);
- //runtime.Send(new IEventHandleFat(actorId, sender, new TEvSelfPing::TEvPing(0.0)));
+ //runtime.Send(new IEventHandle(actorId, sender, new TEvSelfPing::TEvPing(0.0)));
// TODO check after events are handled
//Sleep(TDuration::Seconds(1));
diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp
index ddc3c67011..fa63783c2d 100644
--- a/library/cpp/actors/http/http_proxy_incoming.cpp
+++ b/library/cpp/actors/http/http_proxy_incoming.cpp
@@ -59,7 +59,7 @@ public:
}
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override {
- return new IEventHandleFat(self, parent, new TEvents::TEvBootstrap());
+ return new IEventHandle(self, parent, new TEvents::TEvBootstrap());
}
void Die(const TActorContext& ctx) override {
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index 7fbcb3f3d6..e06de07867 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -303,22 +303,22 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
actorSystem.DispatchEvents();
NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://127.0.0.1:" + ToString(port) + "/test");
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
- actorSystem.Send(new NActors::IEventHandleFat(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
@@ -336,22 +336,22 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
actorSystem.DispatchEvents();
NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
- actorSystem.Send(new NActors::IEventHandleFat(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
@@ -432,22 +432,22 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
add->CertificateFile = certificateFile.Name();
add->PrivateKeyFile = certificateFile.Name();
/////////
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), add.Release()), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true);
actorSystem.DispatchEvents();
NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test");
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
- actorSystem.Send(new NActors::IEventHandleFat(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
@@ -487,11 +487,11 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
actorSystem.DispatchEvents();
NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
@@ -499,7 +499,7 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
TString longHeader;
longHeader.append(9000, 'X');
httpRequest->Set(longHeader, "data");
- actorSystem.Send(new NActors::IEventHandleFat(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
diff --git a/library/cpp/actors/interconnect/event_filter.h b/library/cpp/actors/interconnect/event_filter.h
index de3fdc2a04..47dabf5f16 100644
--- a/library/cpp/actors/interconnect/event_filter.h
+++ b/library/cpp/actors/interconnect/event_filter.h
@@ -31,7 +31,7 @@ namespace NActors {
evSpaceIndex[subtype] = routes;
}
- bool CheckIncomingEvent(const IEventHandleFat& ev, const TScopeId& localScopeId) const {
+ bool CheckIncomingEvent(const IEventHandle& ev, const TScopeId& localScopeId) const {
TRouteMask routes = 0;
if (const auto& evSpaceIndex = ScopeRoutes[ev.Type >> 16]) {
const ui16 subtype = ev.Type & 65535;
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
index 112a0b1b6e..e2e714a213 100644
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -12,14 +12,14 @@ namespace NActors {
TBrokerLeaseHolder(TActorId waiterId, TActorId brokerId)
: WaiterId(waiterId)
, BrokerId(brokerId) {
- if (TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) {
+ if (TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) {
LeaseRequested = true;
}
}
~TBrokerLeaseHolder() {
if (LeaseRequested) {
- TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerFree()));
+ TActivationContext::Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree()));
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index ce4adcdec2..312eff2666 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -59,12 +59,10 @@ namespace NActors {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
- if (ev.IsEventFat()) {
- if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
- event.Span
- .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
- .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
- }
+ if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
+ event.Span
+ .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
+ .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
}
return std::make_pair(bytes, &event);
}
diff --git a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
index e917a27ea7..a450d16871 100644
--- a/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
+++ b/library/cpp/actors/interconnect/interconnect_proxy_wrapper.cpp
@@ -18,7 +18,7 @@ namespace NActors {
, Mock(mock)
{}
- STATEFN(StateFunc) {
+ STFUNC(StateFunc) {
if (ev->GetTypeRewrite() == TEvents::TSystem::Poison && !Proxy) {
PassAway();
} else {
@@ -32,7 +32,7 @@ namespace NActors {
}
Y_VERIFY(Proxy);
}
- InvokeOtherActor(*Proxy, &IActor::Receive, ev);
+ InvokeOtherActor(*Proxy, &IActor::Receive, ev, ctx);
}
}
};
diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp
index 0b0b112628..d638ff830c 100644
--- a/library/cpp/actors/interconnect/interconnect_resolve.cpp
+++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp
@@ -120,7 +120,7 @@ namespace NActors {
LOG_DEBUG_IC("ICR03", "Host: %s, RESOLVED address", Host.c_str());
auto reply = new TEvAddressInfo;
reply->Address = std::move(addr);
- TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply));
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply));
PassAway();
}
@@ -129,7 +129,7 @@ namespace NActors {
auto reply = std::make_unique<TEvLocalNodeInfo>();
reply->NodeId = *NodeId;
reply->Addresses = std::move(addresses);
- TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, reply.release()));
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release()));
PassAway();
}
@@ -138,7 +138,7 @@ namespace NActors {
auto *event = new TEvResolveError;
event->Explain = errorText;
event->Host = Host;
- TActivationContext::Send(new IEventHandleFat(ReplyTo, ReplyFrom, event));
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, event));
PassAway();
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 122f312fec..32f10c727b 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -334,31 +334,7 @@ namespace NActors {
TEventSerializationInfo serializationInfo{
.IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat),
};
- auto es = GetEventSpace(descr.Type);
- if (es < TEventFactories::EventFactories.size() && TEventFactories::EventFactories[es] != nullptr) {
- const auto& estvec(*TEventFactories::EventFactories[es]);
- auto est = GetEventSubType(descr.Type);
- if (est < estvec.size() && estvec[est] != nullptr) {
- IEventFactory* factory = estvec[est];
- TAutoPtr<IEventHandle> ev = factory->Construct({
- .Session = SessionId,
- .Type = descr.Type,
- .Flags = descr.Flags,
- .Recipient = descr.Recipient,
- .Sender = descr.Sender,
- .Cookie = descr.Cookie,
- .OriginScopeId = Params.PeerScopeId,
- .TraceId = std::move(descr.TraceId),
- .Data = std::move(data),
- });
- if (ev) {
- TActivationContext::Send(ev);
- }
- return;
- }
- }
-
- auto ev = std::make_unique<IEventHandleFat>(SessionId,
+ auto ev = std::make_unique<IEventHandle>(SessionId,
descr.Type,
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 8ee93dfc7a..8562f6a440 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -47,7 +47,7 @@ namespace NActors {
void TInterconnectProxyTCP::Registered(TActorSystem* sys, const TActorId& owner) {
if (!DynamicPtr) {
// perform usual bootstrap for static nodes
- sys->Send(new IEventHandleFat(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
+ sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), owner, nullptr, 0));
}
if (const auto& mon = Common->RegisterMonPage) {
TString path = Sprintf("peer%04" PRIu32, PeerNodeId);
@@ -591,7 +591,7 @@ namespace NActors {
// we have found cancellation request for the pending handshake request; so simply remove it from the
// deque, as we are not interested in failure reason; must likely it happens because of handshake timeout
if (pendingEvent->GetTypeRewrite() == TEvHandshakeFail::EventType) {
- TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(pendingEvent.Release()));
+ TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(pendingEvent.Release()));
LogHandshakeFail(tmp, true);
}
PendingIncomingHandshakeEvents.erase(it);
@@ -605,7 +605,7 @@ namespace NActors {
Y_VERIFY(Session && SessionID);
ValidateEvent(ev, "ForwardSessionEventToSession");
- InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev);
+ InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID));
}
void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) {
@@ -774,7 +774,7 @@ namespace NActors {
for (auto& ev : PendingIncomingHandshakeEvents) {
Send(ev->Sender, new TEvents::TEvPoisonPill);
if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) {
- TEvHandshakeFail::TPtr tmp(static_cast<TEventHandleFat<TEvHandshakeFail>*>(ev.Release()));
+ TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release()));
LogHandshakeFail(tmp, true);
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index ebf02c3f27..71edfccbe2 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -63,10 +63,10 @@ namespace NActors {
TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr);
- STATEFN(StateInit) {
+ STFUNC(StateInit) {
Bootstrap();
if (ev->Type != TEvents::TSystem::Bootstrap) { // for dynamic nodes we do not receive Bootstrap event
- Receive(ev);
+ Receive(ev, ctx);
}
}
@@ -180,7 +180,7 @@ namespace NActors {
} else if (DynamicPtr) {
PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15);
if (!PassAwayScheduled) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(),
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
{}, nullptr, 0));
PassAwayScheduled = true;
}
@@ -205,7 +205,7 @@ namespace NActors {
if (now >= PassAwayTimestamp) {
PassAway();
} else if (PassAwayTimestamp != TMonotonic::Max()) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandleFat(EvPassAwayIfNeeded, 0, SelfId(),
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
{}, nullptr, 0));
} else {
PassAwayScheduled = false;
@@ -370,21 +370,6 @@ namespace NActors {
ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func);
}
- void ValidateEvent(IEventHandle* ev, const char* func) {
- if (SelfId().NodeId() == PeerNodeId) {
- TString msg = Sprintf("Event Type# 0x%08" PRIx32 " TypeRewrite# 0x%08" PRIx32
- " from Sender# %s sent to the proxy for the node itself via Interconnect;"
- " THIS IS NOT A BUG IN INTERCONNECT, check the event sender instead",
- ev->Type, ev->GetTypeRewrite(), ev->Sender.ToString().data());
- LOG_ERROR_IC("ICP03", "%s", msg.data());
- Y_VERIFY_DEBUG(false, "%s", msg.data());
- }
-
- Y_VERIFY(ev->GetTypeRewrite() != TEvInterconnect::EvForward || ev->Recipient.NodeId() == PeerNodeId,
- "Recipient/Proxy NodeId mismatch Recipient# %s Type# 0x%08" PRIx32 " PeerNodeId# %" PRIu32 " Func# %s",
- ev->Recipient.ToString().data(), ev->Type, PeerNodeId, func);
- }
-
// Common with helpers
// All proxy actors share the same information in the object
// read only
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index 316c233af3..aad8677ca4 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -25,7 +25,7 @@ namespace NActors {
}
TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
- return new IEventHandleFat(self, parentId, new TEvents::TEvBootstrap, 0);
+ return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
}
void TInterconnectListenerTCP::Die(const TActorContext& ctx) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 5a93bc0cc8..a336e4a89f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -112,8 +112,8 @@ namespace NActors {
Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
}
- void TInterconnectSessionTCP::Forward(TAutoPtr<IEventHandle>& ev) {
- Proxy->ValidateEvent(ev.Get(), "Forward");
+ void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
+ Proxy->ValidateEvent(ev, "Forward");
LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
++MessagesGot;
@@ -126,7 +126,7 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev.Get());
+ const auto [dataSize, event] = oChannel.Push(*ev);
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
@@ -167,7 +167,7 @@ namespace NActors {
} else if (!RamInQueue) {
Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
RamInQueue = new TEvRam(true);
- auto *ev = new IEventHandleFat(SelfId(), {}, RamInQueue);
+ auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
if (batchPeriod != TDuration()) {
TActivationContext::Schedule(batchPeriod, ev);
@@ -179,7 +179,7 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::Subscribe(TAutoPtr<IEventHandle>& ev) {
+ void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
if (inserted) {
@@ -190,7 +190,7 @@ namespace NActors {
Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie);
}
- void TInterconnectSessionTCP::Unsubscribe(TEvents::TEvUnsubscribe::TPtr ev) {
+ void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 9d8bb90ecd..598a5c9220 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -347,16 +347,16 @@ namespace NActors {
void Terminate(TDisconnectReason reason);
void PassAway() override;
- void Forward(LIGHTFN_SIG);
- void Subscribe(TAutoPtr<IEventHandle>& ev);
- void Unsubscribe(TEvents::TEvUnsubscribe::TPtr);
+ void Forward(STATEFN_SIG);
+ void Subscribe(STATEFN_SIG);
+ void Unsubscribe(STATEFN_SIG);
- STRICT_LIGHTFN(StateFunc,
+ STRICT_STFUNC(StateFunc,
fFunc(TEvInterconnect::EvForward, Forward)
cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
- hFunc(TEvents::TEvUnsubscribe, Unsubscribe)
+ fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe)
cFunc(TEvFlush::EventType, HandleFlush)
hFunc(TEvPollerReady, Handle)
hFunc(TEvPollerRegisterResult, Handle)
@@ -531,7 +531,7 @@ namespace NActors {
auto sender = SelfId();
const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
auto ev = new TEvSessionBufferSizeRequest();
- return new IEventHandleFat(recp, sender, ev, IEventHandle::FlagTrackDelivery);
+ return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
};
RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
Become(&TInterconnectSessionKiller::StateFunc);
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index 65e1d202ff..d460903f35 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -82,7 +82,7 @@ namespace NInterconnect {
)
void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) {
- ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]).Release());
+ ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex]));
if (++SlaveIndex == Slaves.size()) {
SlaveIndex = 0;
}
diff --git a/library/cpp/actors/interconnect/mock/ic_mock.cpp b/library/cpp/actors/interconnect/mock/ic_mock.cpp
index 5619f0f113..0aadc7ae35 100644
--- a/library/cpp/actors/interconnect/mock/ic_mock.cpp
+++ b/library/cpp/actors/interconnect/mock/ic_mock.cpp
@@ -60,11 +60,11 @@ namespace NActors {
TPeerInfo *peer = GetPeer(peerNodeId);
auto guard = TReadGuard(peer->Mutex);
if (peer->ActorSystem) {
- peer->ActorSystem->Send(new IEventHandleFat(peer->ProxyId, TActorId(), new TEvInject(std::move(messages),
+ peer->ActorSystem->Send(new IEventHandle(peer->ProxyId, TActorId(), new TEvInject(std::move(messages),
originScopeId, senderSessionId)));
} else {
for (auto&& ev : messages) {
- TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected));
}
}
}
@@ -78,7 +78,7 @@ namespace NActors {
TPeerInfo *peer = GetPeer(peerNodeId);
auto guard = TReadGuard(peer->Mutex);
if (peer->ActorSystem) {
- peer->ActorSystem->Send(new IEventHandleFat(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0));
+ peer->ActorSystem->Send(new IEventHandle(EvCheckSession, 0, peer->ProxyId, {}, nullptr, 0));
}
}
@@ -114,7 +114,7 @@ namespace NActors {
void Terminate() {
for (auto&& ev : std::exchange(Queue, {})) {
- TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected));
}
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
@@ -130,7 +130,7 @@ namespace NActors {
Subscribe(ev->Sender, ev->Cookie);
}
if (Queue.empty()) {
- TActivationContext::Send(new IEventHandleFat(EvRam, 0, SelfId(), {}, {}, 0));
+ TActivationContext::Send(new IEventHandle(EvRam, 0, SelfId(), {}, {}, 0));
}
Queue.emplace_back(ev.Release());
}
@@ -193,7 +193,7 @@ namespace NActors {
}
template <typename TEvent>
- bool CheckNodeStatus(TAutoPtr<TEventHandleFat<TEvent>>& ev) {
+ bool CheckNodeStatus(TAutoPtr<TEventHandle<TEvent>>& ev) {
if (PeerNodeStatus != EPeerNodeStatus::EXISTS) {
std::unique_ptr<IEventHandle> tmp(ev.Release());
CheckNonexistentNode(tmp);
@@ -224,7 +224,7 @@ namespace NActors {
if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, ev->Cookie);
}
- TActivationContext::Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::Disconnected));
+ TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), TEvents::TEvUndelivered::Disconnected));
break;
case TEvents::TEvSubscribe::EventType:
@@ -252,7 +252,7 @@ namespace NActors {
while (!WaitingConnections.empty()) {
TAutoPtr<IEventHandle> tmp(WaitingConnections.front().release());
WaitingConnections.pop_front();
- Receive(tmp);
+ Receive(tmp, TActivationContext::AsActorContext());
}
}
};
@@ -287,28 +287,20 @@ namespace NActors {
return; // drop messages from other sessions
}
if (auto *session = GetSession()) {
- for (auto&& evb : ev->Get()->Messages) {
- if (ev->IsEventLight()) {
- // TODO(xenoxeno):
- //if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
- TActivationContext::Send(evb.release());
- //}
- } else {
- auto* ev = IEventHandleFat::GetFat(evb);
- auto fw = std::make_unique<IEventHandleFat>(
- session->SelfId(),
- ev->Type,
- ev->Flags & ~IEventHandle::FlagForwardOnNondelivery,
- ev->Recipient,
- ev->Sender,
- ev->ReleaseChainBuffer(),
- ev->Cookie,
- msg->OriginScopeId,
- std::move(ev->TraceId)
- );
- if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
- TActivationContext::Send(fw.release());
- }
+ for (auto&& ev : ev->Get()->Messages) {
+ auto fw = std::make_unique<IEventHandle>(
+ session->SelfId(),
+ ev->Type,
+ ev->Flags & ~IEventHandle::FlagForwardOnNondelivery,
+ ev->Recipient,
+ ev->Sender,
+ ev->ReleaseChainBuffer(),
+ ev->Cookie,
+ msg->OriginScopeId,
+ std::move(ev->TraceId)
+ );
+ if (!Common->EventFilter || Common->EventFilter->CheckIncomingEvent(*fw, Common->LocalScopeId)) {
+ TActivationContext::Send(fw.release());
}
}
}
@@ -330,7 +322,8 @@ namespace NActors {
void HandleSessionEvent(TAutoPtr<IEventHandle> ev) {
auto *session = GetSession();
- InvokeOtherActor(*session, &TSessionMockActor::Receive, ev);
+ InvokeOtherActor(*session, &TSessionMockActor::Receive, ev,
+ TActivationContext::ActorContextFor(session->SelfId()));
}
void Disconnect() {
@@ -351,7 +344,7 @@ namespace NActors {
return State.Inject(PeerNodeId, std::move(messages), Common->LocalScopeId, Session->SessionId);
}
- STRICT_LIGHTFN(StateFunc,
+ STRICT_STFUNC(StateFunc,
cFunc(TEvents::TSystem::Poison, PassAway)
fFunc(TEvInterconnect::EvForward, HandleSessionEvent)
fFunc(TEvInterconnect::EvConnectNode, HandleSessionEvent)
diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp
index 7b0bdf3e17..9ba173e330 100644
--- a/library/cpp/actors/interconnect/packet.cpp
+++ b/library/cpp/actors/interconnect/packet.cpp
@@ -17,25 +17,14 @@ ui32 TEventHolder::Fill(IEventHandle& ev) {
EventActuallySerialized = 0;
Descr.Checksum = 0;
- if (ev.IsEventLight()) {
- if (ev.IsEventSerializable()) {
- NActors::IEventHandleLightSerializable& serializable(*NActors::IEventHandleLightSerializable::GetLightSerializable(&ev));
- EventSerializer = serializable.Serializer;
- EventSerializedSize = 100;
- } else {
- EventSerializedSize = 0;
- }
+ if (ev.HasBuffer()) {
+ Buffer = ev.ReleaseChainBuffer();
+ EventSerializedSize = Buffer->GetSize();
+ } else if (ev.HasEvent()) {
+ Event.Reset(ev.ReleaseBase());
+ EventSerializedSize = Event->CalculateSerializedSize();
} else {
- auto& evFat = *IEventHandleFat::GetFat(&ev);
- if (evFat.HasBuffer()) {
- Buffer = evFat.ReleaseChainBuffer();
- EventSerializedSize = Buffer->GetSize();
- } else if (evFat.HasEvent()) {
- Event.Reset(evFat.ReleaseBase());
- EventSerializedSize = Event->CalculateSerializedSize();
- } else {
- EventSerializedSize = 0;
- }
+ EventSerializedSize = 0;
}
return EventSerializedSize;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index c06e648541..f3c506a663 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -113,7 +113,6 @@ struct TEventHolder : TNonCopyable {
TActorId ForwardRecipient;
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
- NActors::TEventSerializer EventSerializer;
ui64 Serial;
ui32 EventSerializedSize;
ui32 EventActuallySerialized;
@@ -138,11 +137,10 @@ struct TEventHolder : TNonCopyable {
const TActorId& s = d.Sender;
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
Span.EndError("nondelivery");
- TAutoPtr<IEventHandle> ev = Event
- ? new IEventHandleFat(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
- : new IEventHandleFat(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
- ev = IEventHandle::ForwardOnNondelivery(ev, NActors::TEvents::TEvUndelivered::Disconnected, unsure);
- NActors::TActivationContext::Send(ev);
+ auto ev = Event
+ ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
+ : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
+ NActors::TActivationContext::Send(IEventHandle::ForwardOnNondelivery(std::move(ev), NActors::TEvents::TEvUndelivered::Disconnected, unsure));
}
void Clear() {
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index 7161c6ca90..e75cbcaef4 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -104,7 +104,7 @@ namespace NActors {
protected:
void Notify(TSocketRecord *record, bool read, bool write) {
auto issue = [&](const TActorId& recipient) {
- ActorSystem->Send(new IEventHandleFat(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
+ ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
};
if (read && record->ReadActorId) {
issue(record->ReadActorId);
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 9a541aeb86..b1d2e02f49 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -64,8 +64,6 @@ namespace NActors {
using NActors::IEventBase;
using NActors::IEventHandle;
-using NActors::IEventHandleFat;
-using NActors::IEventHandleLight;
using NActors::TActorId;
using NActors::TConstIoVec;
using NActors::TEventSerializedData;
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 561248c3e5..32c8237b59 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandleFat>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
index 78158f07cc..3c474979dc 100644
--- a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
@@ -106,7 +106,7 @@ void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui3
const TActorId target = MakeResponderServiceId(nodeId);
for (ui32 i = 0; i < count; ++i) {
const ui32 flags = IEventHandle::FlagTrackDelivery;
- as->Send(new IEventHandleFat(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i));
+ as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i));
}
}
diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
index 000f5d4b3e..3596bffd5a 100644
--- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
@@ -46,30 +46,25 @@ public:
}
const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);
InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));
- TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
+ TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie));
// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);
++NextCookie;
}
}
- void HandlePong(TAutoPtr<IEventHandle> e) {
+ void HandlePong(TAutoPtr<IEventHandle> ev) {
// Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl);
- if (e->IsEventFat()) {
- auto ev = IEventHandleFat::GetFat(e);
- if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
- auto& [s2cIt, hash] = it->second;
- Y_VERIFY(hash == ev->GetChainBuffer()->GetString());
- SessionToCookie.erase(s2cIt);
- InFlight.erase(it);
- } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) {
- Y_VERIFY(it->second == ev->GetChainBuffer()->GetString());
- Tentative.erase(it);
- } else {
- Y_FAIL("Cookie# %" PRIu64, ev->Cookie);
- }
+ if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
+ auto& [s2cIt, hash] = it->second;
+ Y_VERIFY(hash == ev->GetChainBuffer()->GetString());
+ SessionToCookie.erase(s2cIt);
+ InFlight.erase(it);
+ } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) {
+ Y_VERIFY(it->second == ev->GetChainBuffer()->GetString());
+ Tentative.erase(it);
} else {
- Y_FAIL("Pong is not fat");
+ Y_FAIL("Cookie# %" PRIu64, ev->Cookie);
}
IssueQueries();
}
@@ -128,13 +123,10 @@ public:
{}
void HandlePing(TAutoPtr<IEventHandle>& ev) {
- if (ev->IsEventFat()) {
- auto* evf = IEventHandleFat::GetFat(ev);
- const TString& data = evf->GetChainBuffer()->GetString();
- const TString& response = MD5::CalcRaw(data);
- TActivationContext::Send(new IEventHandleFat(TEvents::THelloWorld::Pong, 0, evf->Sender, SelfId(),
- MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), evf->Cookie));
- }
+ const TString& data = ev->GetChainBuffer()->GetString();
+ const TString& response = MD5::CalcRaw(data);
+ TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(),
+ MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie));
}
STRICT_STFUNC(StateFunc,
diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
index 745a020d2a..23d846a2fd 100644
--- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
@@ -253,7 +253,7 @@ public:
private:
void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) {
auto ev = new TEvPollerRegister{socket, readActorId, writeActorId};
- ActorSystem_->Send(new IEventHandleFat(PollerId_, TActorId{}, ev));
+ ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
}
private:
diff --git a/library/cpp/actors/testlib/decorator_ut.cpp b/library/cpp/actors/testlib/decorator_ut.cpp
index 3c70d25ec0..fe5c769290 100644
--- a/library/cpp/actors/testlib/decorator_ut.cpp
+++ b/library/cpp/actors/testlib/decorator_ut.cpp
@@ -28,7 +28,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) {
virtual ~TDyingChecker() {
Write("TDyingChecker::~TDyingChecker");
- TActivationContext::Send(new IEventHandleFat(MasterId, SelfId(), new TEvents::TEvPing()));
+ TActivationContext::Send(new IEventHandle(MasterId, SelfId(), new TEvents::TEvPing()));
}
bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
@@ -104,7 +104,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) {
return true;
}
Write("TFizzBuzzToFooBar::DoBeforeSending");
- TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get());
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
UNIT_ASSERT(handle);
TEvWords *event = handle->Get();
TVector<TString> &words = event->Words;
@@ -144,7 +144,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) {
return true;
}
Write("TWordEraser::DoBeforeSending");
- TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get());
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
UNIT_ASSERT(handle);
TEvWords *event = handle->Get();
TVector<TString> &words = event->Words;
@@ -176,7 +176,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) {
return true;
}
Write("TWithoutWordsDroper::DoBeforeSending");
- TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get());
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
UNIT_ASSERT(handle);
TEvWords *event = handle->Get();
return bool(event->Words);
@@ -208,7 +208,7 @@ Y_UNIT_TEST_SUITE(TesTTestDecorator) {
}
STATEFN(State) {
- TEventHandleFat<TEvWords> *handle = reinterpret_cast<TEventHandleFat<TEvWords>*>(ev.Get());
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
UNIT_ASSERT(handle);
UNIT_ASSERT(handle->Sender == MasterId);
TEvWords *event = handle->Get();
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 889edc4969..962cfe81d4 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -48,10 +48,10 @@ namespace NActors {
Cerr << ", ";
if (ev->HasEvent())
Cerr << " : " << (PRINT_EVENT_BODY ? ev->ToString() : ev->GetTypeName());
- // else if (ev->HasBuffer())
- // Cerr << " : BUFFER";
- // else
- // Cerr << " : EMPTY";
+ else if (ev->HasBuffer())
+ Cerr << " : BUFFER";
+ else
+ Cerr << " : EMPTY";
Cerr << "\n";
}
@@ -397,7 +397,7 @@ namespace NActors {
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
TActivationContext *prevTlsActivationContext = TlsActivationContext;
TlsActivationContext = &ctx;
- recipientActor->Receive(ev);
+ recipientActor->Receive(ev, ctx);
TlsActivationContext = prevTlsActivationContext;
// we expect the logger to never die in tests
}
@@ -1420,9 +1420,8 @@ namespace NActors {
}
}
- void TTestActorRuntimeBase::Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventHandleLight> ev, ui32 senderNodeIndex, bool viaActorSystem) {
- ev->PrepareSend(recipient, sender);
- Send(ev.Release(), senderNodeIndex, viaActorSystem);
+ void TTestActorRuntimeBase::Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 senderNodeIndex, bool viaActorSystem) {
+ Send(new IEventHandle(recipient, sender, ev.Release()), senderNodeIndex, viaActorSystem);
}
void TTestActorRuntimeBase::Send(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex, bool viaActorSystem) {
@@ -1628,7 +1627,7 @@ namespace NActors {
TCallstack::GetTlsCallstack() = ev->Callstack;
TCallstack::GetTlsCallstack().SetLinesToSkip();
#endif
- recipientActor->Receive(ev);
+ recipientActor->Receive(ev, ctx);
node->ExecutorThread->DropUnregistered();
}
CurrentRecipient = TActorId();
@@ -1866,8 +1865,7 @@ namespace NActors {
if (HasReply) {
delete Context->Queue->Pop();
}
- IEventHandle::Forward(ev, originalSender);
- ctx.ExecutorThread.Send(ev);
+ ctx.ExecutorThread.Send(IEventHandle::Forward(ev, originalSender));
if (!IsSync && Context->Queue->Head()) {
SendHead(ctx);
}
@@ -1899,17 +1897,9 @@ namespace NActors {
TAutoPtr<IEventHandle> GetForwardedEvent() {
IEventHandle* ev = Context->Queue->Head();
ReplyChecker->OnRequest(ev);
- TAutoPtr<IEventHandle> forwardedEv;
- if (ev->IsEventLight()) {
- IEventHandleLight* evl = IEventHandleLight::GetLight(ev);
- evl->PrepareSend(Delegatee, ReplyId);
- forwardedEv = ev;
- } else {
- IEventHandleFat* evf = IEventHandleFat::GetFat(ev);
- forwardedEv = ev->HasEvent()
- ? new IEventHandleFat(Delegatee, ReplyId, evf->ReleaseBase().Release(), evf->Flags, evf->Cookie)
- : new IEventHandleFat(evf->GetTypeRewrite(), evf->Flags, Delegatee, ReplyId, evf->ReleaseChainBuffer(), evf->Cookie);
- }
+ TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
+ ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
+ : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
return forwardedEv;
}
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index c48708b975..0c1e4207cc 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -260,7 +260,7 @@ namespace NActors {
bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions());
bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout);
bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline);
- void Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventHandleLight> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
+ void Send(const TActorId& recipient, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
void Send(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
void SendAsync(TAutoPtr<IEventHandle> ev, ui32 senderNodeIndex = 0);
void Schedule(TAutoPtr<IEventHandle> ev, const TDuration& duration, ui32 nodeIndex = 0);
@@ -371,7 +371,7 @@ namespace NActors {
std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; };
GrabEdgeEventIf(handle, truth, simTimeout);
if (handle) {
- return THolder<TEvent>(IEventHandle::Release<TEvent>(handle));
+ return THolder<TEvent>(handle->Release<TEvent>());
}
return {};
}
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
index 42aa3a6fbd..dcd458be7c 100644
--- a/library/cpp/actors/wilson/wilson_span.cpp
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -54,7 +54,7 @@ namespace NWilson {
void TSpan::Send() {
if (TlsActivationContext) {
- TActivationContext::Send(new IEventHandleFat(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
+ TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
}
Data->Sent = true;
}
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp
index b9b4ee2b21..3e47a8a315 100644
--- a/library/cpp/actors/wilson/wilson_uploader.cpp
+++ b/library/cpp/actors/wilson/wilson_uploader.cpp
@@ -160,7 +160,7 @@ namespace NWilson {
template<typename T>
void ScheduleWakeup(T&& deadline) {
if (!WakeupScheduled) {
- TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::Wakeup, 0, SelfId(), {},
+ TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {},
nullptr, 0));
WakeupScheduled = true;
}