diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/core/event.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/core/event.h')
-rw-r--r-- | library/cpp/actors/core/event.h | 344 |
1 files changed, 344 insertions, 0 deletions
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h new file mode 100644 index 0000000000..6ff02aaf94 --- /dev/null +++ b/library/cpp/actors/core/event.h @@ -0,0 +1,344 @@ +#pragma once + +#include "defs.h" +#include "actorid.h" +#include "callstack.h" +#include "event_load.h" + +#include <library/cpp/actors/wilson/wilson_trace.h> + +#include <util/system/hp_timer.h> +#include <util/generic/maybe.h> + +namespace NActors { + class TChunkSerializer; + + class ISerializerToStream { + public: + virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; + }; + + class IEventBase + : TNonCopyable, + public ISerializerToStream { + public: + // actual typing is performed by IEventHandle + + virtual ~IEventBase() { + } + + virtual TString ToStringHeader() const = 0; + virtual TString ToString() const { + return ToStringHeader(); + } + virtual ui32 CalculateSerializedSize() const { + return 0; + } + virtual ui32 Type() const = 0; + virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; + virtual bool IsSerializable() const = 0; + virtual bool IsExtendedFormat() const { + return false; + } + virtual ui32 CalculateSerializedSizeCached() const { + return CalculateSerializedSize(); + } + }; + + // fat handle + class IEventHandle : TNonCopyable { + struct TOnNondelivery { + TActorId Recipient; + + TOnNondelivery(const TActorId& recipient) + : Recipient(recipient) + { + } + }; + + public: + template <typename TEv> + inline TEv* CastAsLocal() const noexcept { + auto fits = GetTypeRewrite() == TEv::EventType; + + return fits ? static_cast<TEv*>(Event.Get()) : nullptr; + } + + template <typename TEventType> + TEventType* Get() { + if (Type != TEventType::EventType) + Y_FAIL("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType); + + if (!Event) { + Event.Reset(TEventType::Load(Buffer.Get())); + } + + if (Event) { + return static_cast<TEventType*>(Event.Get()); + } + + Y_FAIL("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data()); + } + + template <typename T> + TAutoPtr<T> Release() { + TAutoPtr<T> x = Get<T>(); + Y_UNUSED(Event.Release()); + Buffer.Reset(); + return x; + } + + enum EFlags { + FlagTrackDelivery = 1 << 0, + FlagForwardOnNondelivery = 1 << 1, + FlagSubscribeOnSession = 1 << 2, + FlagUseSubChannel = 1 << 3, + FlagGenerateUnsureUndelivered = 1 << 4, + FlagExtendedFormat = 1 << 5, + }; + + const ui32 Type; + const ui32 Flags; + const TActorId Recipient; + const TActorId Sender; + const ui64 Cookie; + const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect + + // if set, used by ActorSystem/Interconnect to report tracepoints + NWilson::TTraceId TraceId; + + // filled if feeded by interconnect session + const TActorId InterconnectSession; + +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + ::NHPTimer::STime SendTime; +#endif + + static const size_t ChannelBits = 12; + static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits; + +#ifdef USE_ACTOR_CALLSTACK + TCallstack Callstack; +#endif + ui16 GetChannel() const noexcept { + return Flags >> ChannelShift; + } + + ui64 GetSubChannel() const noexcept { + return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL; + } + + static ui32 MakeFlags(ui32 channel, ui32 flags) { + Y_VERIFY(channel < (1 << ChannelBits)); + Y_VERIFY(flags < (1 << ChannelShift)); + return (flags | (channel << ChannelShift)); + } + + private: + THolder<IEventBase> Event; + TIntrusivePtr<TEventSerializedData> Buffer; + + TActorId RewriteRecipient; + ui32 RewriteType; + + THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events + + public: + void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) { + RewriteRecipient = recipientRewrite; + RewriteType = typeRewrite; + } + + void DropRewrite() { + RewriteRecipient = Recipient; + RewriteType = Type; + } + + const TActorId& GetRecipientRewrite() const { + return RewriteRecipient; + } + + ui32 GetTypeRewrite() const { + return RewriteType; + } + + TActorId GetForwardOnNondeliveryRecipient() const { + return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId(); + } + + IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, + const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) + : Type(ev->Type()) + , Flags(flags) + , Recipient(recipient) + , Sender(sender) + , Cookie(cookie) + , TraceId(std::move(traceId)) +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + , SendTime(0) +#endif + , Event(ev) + , RewriteRecipient(Recipient) + , RewriteType(Type) + { + if (forwardOnNondelivery) + OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery)); + } + + IEventHandle(ui32 type, + ui32 flags, + const TActorId& recipient, + const TActorId& sender, + TIntrusivePtr<TEventSerializedData> buffer, + ui64 cookie, + const TActorId* forwardOnNondelivery = nullptr, + NWilson::TTraceId traceId = {}) + : Type(type) + , Flags(flags) + , Recipient(recipient) + , Sender(sender) + , Cookie(cookie) + , TraceId(std::move(traceId)) +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + , SendTime(0) +#endif + , Buffer(std::move(buffer)) + , RewriteRecipient(Recipient) + , RewriteType(Type) + { + if (forwardOnNondelivery) + OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery)); + } + + // Special ctor for events from interconnect. + IEventHandle(const TActorId& session, + ui32 type, + ui32 flags, + const TActorId& recipient, + const TActorId& sender, + TIntrusivePtr<TEventSerializedData> buffer, + ui64 cookie, + TScopeId originScopeId, + NWilson::TTraceId traceId) noexcept + : Type(type) + , Flags(flags) + , Recipient(recipient) + , Sender(sender) + , Cookie(cookie) + , OriginScopeId(originScopeId) + , TraceId(std::move(traceId)) + , InterconnectSession(session) +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + , SendTime(0) +#endif + , Buffer(std::move(buffer)) + , RewriteRecipient(Recipient) + , RewriteType(Type) + { + } + + TIntrusivePtr<TEventSerializedData> GetChainBuffer(); + TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer(); + + ui32 GetSize() const { + if (Buffer) { + return Buffer->GetSize(); + } else if (Event) { + return Event->CalculateSerializedSize(); + } else { + return 0; + } + } + + bool HasBuffer() const { + return bool(Buffer); + } + + bool HasEvent() const { + return bool(Event); + } + + IEventBase* GetBase() { + if (!Event) { + if (!Buffer) + return nullptr; + else + ythrow TWithBackTrace<yexception>() << "don't know how to load the event from buffer"; + } + + return Event.Get(); + } + + TAutoPtr<IEventBase> ReleaseBase() { + TAutoPtr<IEventBase> x = GetBase(); + Y_UNUSED(Event.Release()); + Buffer.Reset(); + return x; + } + + TAutoPtr<IEventHandle> Forward(const TActorId& dest) { + if (Event) + return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId)); + else + return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId)); + } + + TAutoPtr<IEventHandle> ForwardOnNondelivery(ui32 reason, bool unsure = false); + }; + + template <typename TEventType> + class TEventHandle: public IEventHandle { + TEventHandle(); // we never made instance of TEventHandle + public: + TEventType* Get() { + return IEventHandle::Get<TEventType>(); + } + + TAutoPtr<TEventType> Release() { + return IEventHandle::Release<TEventType>(); + } + }; + + static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)"); + + template <typename TEventType, ui32 EventType0> + class TEventBase: public IEventBase { + public: + static constexpr ui32 EventType = EventType0; + ui32 Type() const override { + return EventType0; + } + // still abstract + + typedef TEventHandle<TEventType> THandle; + typedef TAutoPtr<THandle> TPtr; + }; + +#define DEFINE_SIMPLE_LOCAL_EVENT(eventType, header) \ + TString ToStringHeader() const override { \ + return TString(header); \ + } \ + bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \ + Y_FAIL("Local event " #eventType " is not serializable"); \ + } \ + static IEventBase* Load(NActors::TEventSerializedData*) { \ + Y_FAIL("Local event " #eventType " has no load method"); \ + } \ + bool IsSerializable() const override { \ + return false; \ + } + +#define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \ + TString ToStringHeader() const override { \ + return TString(header); \ + } \ + bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \ + return true; \ + } \ + static IEventBase* Load(NActors::TEventSerializedData*) { \ + return new eventType(); \ + } \ + bool IsSerializable() const override { \ + return true; \ + } +} |