aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/event.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/core/event.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/core/event.h')
-rw-r--r--library/cpp/actors/core/event.h344
1 files changed, 344 insertions, 0 deletions
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
new file mode 100644
index 0000000000..6ff02aaf94
--- /dev/null
+++ b/library/cpp/actors/core/event.h
@@ -0,0 +1,344 @@
+#pragma once
+
+#include "defs.h"
+#include "actorid.h"
+#include "callstack.h"
+#include "event_load.h"
+
+#include <library/cpp/actors/wilson/wilson_trace.h>
+
+#include <util/system/hp_timer.h>
+#include <util/generic/maybe.h>
+
+namespace NActors {
+ class TChunkSerializer;
+
+ class ISerializerToStream {
+ public:
+ virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
+ };
+
+ class IEventBase
+ : TNonCopyable,
+ public ISerializerToStream {
+ public:
+ // actual typing is performed by IEventHandle
+
+ virtual ~IEventBase() {
+ }
+
+ virtual TString ToStringHeader() const = 0;
+ virtual TString ToString() const {
+ return ToStringHeader();
+ }
+ virtual ui32 CalculateSerializedSize() const {
+ return 0;
+ }
+ virtual ui32 Type() const = 0;
+ virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
+ virtual bool IsSerializable() const = 0;
+ virtual bool IsExtendedFormat() const {
+ return false;
+ }
+ virtual ui32 CalculateSerializedSizeCached() const {
+ return CalculateSerializedSize();
+ }
+ };
+
+ // fat handle
+ class IEventHandle : TNonCopyable {
+ struct TOnNondelivery {
+ TActorId Recipient;
+
+ TOnNondelivery(const TActorId& recipient)
+ : Recipient(recipient)
+ {
+ }
+ };
+
+ public:
+ template <typename TEv>
+ inline TEv* CastAsLocal() const noexcept {
+ auto fits = GetTypeRewrite() == TEv::EventType;
+
+ return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
+ }
+
+ template <typename TEventType>
+ TEventType* Get() {
+ if (Type != TEventType::EventType)
+ Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);
+
+ if (!Event) {
+ Event.Reset(TEventType::Load(Buffer.Get()));
+ }
+
+ if (Event) {
+ return static_cast<TEventType*>(Event.Get());
+ }
+
+ Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
+ }
+
+ template <typename T>
+ TAutoPtr<T> Release() {
+ TAutoPtr<T> x = Get<T>();
+ Y_UNUSED(Event.Release());
+ Buffer.Reset();
+ return x;
+ }
+
+ enum EFlags {
+ FlagTrackDelivery = 1 << 0,
+ FlagForwardOnNondelivery = 1 << 1,
+ FlagSubscribeOnSession = 1 << 2,
+ FlagUseSubChannel = 1 << 3,
+ FlagGenerateUnsureUndelivered = 1 << 4,
+ FlagExtendedFormat = 1 << 5,
+ };
+
+ const ui32 Type;
+ const ui32 Flags;
+ const TActorId Recipient;
+ const TActorId Sender;
+ const ui64 Cookie;
+ const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect
+
+ // if set, used by ActorSystem/Interconnect to report tracepoints
+ NWilson::TTraceId TraceId;
+
+ // filled if feeded by interconnect session
+ const TActorId InterconnectSession;
+
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ ::NHPTimer::STime SendTime;
+#endif
+
+ static const size_t ChannelBits = 12;
+ static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
+
+#ifdef USE_ACTOR_CALLSTACK
+ TCallstack Callstack;
+#endif
+ ui16 GetChannel() const noexcept {
+ return Flags >> ChannelShift;
+ }
+
+ ui64 GetSubChannel() const noexcept {
+ return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL;
+ }
+
+ static ui32 MakeFlags(ui32 channel, ui32 flags) {
+ Y_VERIFY(channel < (1 << ChannelBits));
+ Y_VERIFY(flags < (1 << ChannelShift));
+ return (flags | (channel << ChannelShift));
+ }
+
+ private:
+ THolder<IEventBase> Event;
+ TIntrusivePtr<TEventSerializedData> Buffer;
+
+ TActorId RewriteRecipient;
+ ui32 RewriteType;
+
+ THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events
+
+ public:
+ void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
+ RewriteRecipient = recipientRewrite;
+ RewriteType = typeRewrite;
+ }
+
+ void DropRewrite() {
+ RewriteRecipient = Recipient;
+ RewriteType = Type;
+ }
+
+ const TActorId& GetRecipientRewrite() const {
+ return RewriteRecipient;
+ }
+
+ ui32 GetTypeRewrite() const {
+ return RewriteType;
+ }
+
+ TActorId GetForwardOnNondeliveryRecipient() const {
+ return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
+ }
+
+ IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
+ const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
+ : Type(ev->Type())
+ , Flags(flags)
+ , Recipient(recipient)
+ , Sender(sender)
+ , Cookie(cookie)
+ , TraceId(std::move(traceId))
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ , SendTime(0)
+#endif
+ , Event(ev)
+ , RewriteRecipient(Recipient)
+ , RewriteType(Type)
+ {
+ if (forwardOnNondelivery)
+ OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
+ }
+
+ IEventHandle(ui32 type,
+ ui32 flags,
+ const TActorId& recipient,
+ const TActorId& sender,
+ TIntrusivePtr<TEventSerializedData> buffer,
+ ui64 cookie,
+ const TActorId* forwardOnNondelivery = nullptr,
+ NWilson::TTraceId traceId = {})
+ : Type(type)
+ , Flags(flags)
+ , Recipient(recipient)
+ , Sender(sender)
+ , Cookie(cookie)
+ , TraceId(std::move(traceId))
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ , SendTime(0)
+#endif
+ , Buffer(std::move(buffer))
+ , RewriteRecipient(Recipient)
+ , RewriteType(Type)
+ {
+ if (forwardOnNondelivery)
+ OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
+ }
+
+ // Special ctor for events from interconnect.
+ IEventHandle(const TActorId& session,
+ ui32 type,
+ ui32 flags,
+ const TActorId& recipient,
+ const TActorId& sender,
+ TIntrusivePtr<TEventSerializedData> buffer,
+ ui64 cookie,
+ TScopeId originScopeId,
+ NWilson::TTraceId traceId) noexcept
+ : Type(type)
+ , Flags(flags)
+ , Recipient(recipient)
+ , Sender(sender)
+ , Cookie(cookie)
+ , OriginScopeId(originScopeId)
+ , TraceId(std::move(traceId))
+ , InterconnectSession(session)
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ , SendTime(0)
+#endif
+ , Buffer(std::move(buffer))
+ , RewriteRecipient(Recipient)
+ , RewriteType(Type)
+ {
+ }
+
+ TIntrusivePtr<TEventSerializedData> GetChainBuffer();
+ TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer();
+
+ ui32 GetSize() const {
+ if (Buffer) {
+ return Buffer->GetSize();
+ } else if (Event) {
+ return Event->CalculateSerializedSize();
+ } else {
+ return 0;
+ }
+ }
+
+ bool HasBuffer() const {
+ return bool(Buffer);
+ }
+
+ bool HasEvent() const {
+ return bool(Event);
+ }
+
+ IEventBase* GetBase() {
+ if (!Event) {
+ if (!Buffer)
+ return nullptr;
+ else
+ ythrow TWithBackTrace<yexception>() << "don't know how to load the event from buffer";
+ }
+
+ return Event.Get();
+ }
+
+ TAutoPtr<IEventBase> ReleaseBase() {
+ TAutoPtr<IEventBase> x = GetBase();
+ Y_UNUSED(Event.Release());
+ Buffer.Reset();
+ return x;
+ }
+
+ TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
+ if (Event)
+ return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
+ else
+ return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
+ }
+
+ TAutoPtr<IEventHandle> ForwardOnNondelivery(ui32 reason, bool unsure = false);
+ };
+
+ template <typename TEventType>
+ class TEventHandle: public IEventHandle {
+ TEventHandle(); // we never made instance of TEventHandle
+ public:
+ TEventType* Get() {
+ return IEventHandle::Get<TEventType>();
+ }
+
+ TAutoPtr<TEventType> Release() {
+ return IEventHandle::Release<TEventType>();
+ }
+ };
+
+ static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)");
+
+ template <typename TEventType, ui32 EventType0>
+ class TEventBase: public IEventBase {
+ public:
+ static constexpr ui32 EventType = EventType0;
+ ui32 Type() const override {
+ return EventType0;
+ }
+ // still abstract
+
+ typedef TEventHandle<TEventType> THandle;
+ typedef TAutoPtr<THandle> TPtr;
+ };
+
+#define DEFINE_SIMPLE_LOCAL_EVENT(eventType, header) \
+ TString ToStringHeader() const override { \
+ return TString(header); \
+ } \
+ bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
+ Y_FAIL("Local event " #eventType " is not serializable"); \
+ } \
+ static IEventBase* Load(NActors::TEventSerializedData*) { \
+ Y_FAIL("Local event " #eventType " has no load method"); \
+ } \
+ bool IsSerializable() const override { \
+ return false; \
+ }
+
+#define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \
+ TString ToStringHeader() const override { \
+ return TString(header); \
+ } \
+ bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
+ return true; \
+ } \
+ static IEventBase* Load(NActors::TEventSerializedData*) { \
+ return new eventType(); \
+ } \
+ bool IsSerializable() const override { \
+ return true; \
+ }
+}