aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/actors/core/event.h
blob: 3517dc7a689fd426815359d8f3daa06be46243e8 (plain) (tree)
1
2
3
4
5
6
7
8
9


                    
                      
                       
 
                                                   
 
                                 
                               
 
                           
                 
                               
                                                                           



                                      
                                                
                                                                                               
                                        
                         




                                                     
                                                                             

                                                         






                                                      
                                                                           
                                                

                                                            
                                                                                      
      
                                       
                               
                               
 
                                                     




                                      
                               
                                                  
                                                           
                                                                   
         

                                              
                                                                                                                        






                                                                              
             
 
                                                                                                             



                                     
                                      


                           
                           





                                                   
                                 
                        
                                

                                 
                                                                                                                              
 

                                                                         
                                                   
                                           
 




                                                                             
 
                          
                             
      






                                                                       
                                                                
                                                         

                                                       


                                                   



                                                                             
           
















                                                                   
                                                                                           
         
 
                                                                                                                               
                                                                                                    







                                         
                       
                                         



                                                                                     
                               
                                       
                                               
                                                                
                                                                    
                                                    







                                         
                                       
                                         




                                                                                     
                                             
                               
                                       
                                               
                                                                
                                                        



                                  
                                          
                                         
                                          

                                   
                                       
                                         
         
 
                                                                 
 
                              
                         



                                                        
             
         
 
                                
         

                               
         












                                                                                                          
                                      

                           
 
                                                              
                      
                                                                                                                   
                
                                                                                                                
         
                                    
 
                                                                                                                                                      
 
                                                                                                              
         
                                                                                                            
         
                                                                                                                                         
         
                                                                                                                                       
         

                                                                
         

                                                               
         

                                                                      

         

                                                                 
           
                           
                                                   
         
                                                       

         
                                                                                                                                               








                                                     
                                                 

                                       


                                                                         
                                                                                
                                                                          
                                                                         
                                                                          

                                                                         
     
 


                                                                         
                                                                                





                                                                         
     
 
#pragma once

#include "defs.h"
#include "actorid.h"
#include "callstack.h"
#include "event_load.h"

#include <library/cpp/actors/wilson/wilson_trace.h>

#include <util/system/hp_timer.h>
#include <util/generic/maybe.h>

namespace NActors {
    class TChunkSerializer;
    class IActor;
    class ISerializerToStream {
    public:
        virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
    };

    class IEventBase
        : TNonCopyable,
          public ISerializerToStream {
    protected:
        // for compatibility with virtual actors
        virtual bool DoExecute(IActor* /*actor*/, std::unique_ptr<IEventHandle> /*eventPtr*/) {
            Y_DEBUG_ABORT_UNLESS(false);
            return false;
        }
    public:
        // actual typing is performed by IEventHandle

        virtual ~IEventBase() {
        }

        bool Execute(IActor* actor, std::unique_ptr<IEventHandle> eventPtr) {
            return DoExecute(actor, std::move(eventPtr));
        }

        virtual TString ToStringHeader() const = 0;
        virtual TString ToString() const {
            return ToStringHeader();
        }
        virtual ui32 CalculateSerializedSize() const {
            return 0;
        }
        virtual ui32 Type() const = 0;
        virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
        virtual bool IsSerializable() const = 0;
        virtual ui32 CalculateSerializedSizeCached() const {
            return CalculateSerializedSize();
        }
        virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
    };

    // fat handle
    class IEventHandle : TNonCopyable {
        struct TOnNondelivery {
            TActorId Recipient;

            TOnNondelivery(const TActorId& recipient)
                : Recipient(recipient)
            {
            }
        };

    public:
        template <typename TEv>
        inline TEv* CastAsLocal() const noexcept {
            auto fits = GetTypeRewrite() == TEv::EventType;

            return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
        }

        template <typename TEventType>
        TEventType* Get() {
            if (Type != TEventType::EventType)
                Y_ABORT("Event type %" PRIu32 " doesn't match the expected type %" PRIu32, Type, TEventType::EventType);

            if (!Event) {
                static TEventSerializedData empty;
                Event.Reset(TEventType::Load(Buffer ? Buffer.Get() : &empty));
            }

            if (Event) {
                return static_cast<TEventType*>(Event.Get());
            }

            Y_ABORT("Failed to Load() event type %" PRIu32 " class %s", Type, TypeName<TEventType>().data());
        }

        template <typename T>
        TAutoPtr<T> Release() {
            TAutoPtr<T> x = Get<T>();
            Y_UNUSED(Event.Release());
            Buffer.Reset();
            return x;
        }

        enum EFlags: ui32 {
            FlagTrackDelivery = 1 << 0,
            FlagForwardOnNondelivery = 1 << 1,
            FlagSubscribeOnSession = 1 << 2,
            FlagUseSubChannel = 1 << 3,
            FlagGenerateUnsureUndelivered = 1 << 4,
            FlagExtendedFormat = 1 << 5,
        };
        using TEventFlags = ui32;

        const ui32 Type;
        const TEventFlags Flags;
        const TActorId Recipient;
        TActorId Sender;
        const ui64 Cookie;
        const TScopeId OriginScopeId = TScopeId::LocallyGenerated; // filled in when the message is received from Interconnect

        // if set, used by ActorSystem/Interconnect to report tracepoints
        NWilson::TTraceId TraceId;

        // filled if feeded by interconnect session
        const TActorId InterconnectSession;

#ifdef ACTORSLIB_COLLECT_EXEC_STATS
        ::NHPTimer::STime SendTime;
#endif

        static const size_t ChannelBits = 12;
        static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;

#ifdef USE_ACTOR_CALLSTACK
        TCallstack Callstack;
#endif
        ui16 GetChannel() const noexcept {
            return Flags >> ChannelShift;
        }

        ui64 GetSubChannel() const noexcept {
            return Flags & FlagUseSubChannel ? Sender.LocalId() : 0ULL;
        }

        static ui32 MakeFlags(ui32 channel, TEventFlags flags) {
            Y_ABORT_UNLESS(channel < (1 << ChannelBits));
            Y_ABORT_UNLESS(flags < (1 << ChannelShift));
            return (flags | (channel << ChannelShift));
        }

    private:
        THolder<IEventBase> Event;
        TIntrusivePtr<TEventSerializedData> Buffer;

        TActorId RewriteRecipient;
        ui32 RewriteType;

        THolder<TOnNondelivery> OnNondeliveryHolder; // only for local events

    public:
        void Rewrite(ui32 typeRewrite, TActorId recipientRewrite) {
            RewriteRecipient = recipientRewrite;
            RewriteType = typeRewrite;
        }

        void DropRewrite() {
            RewriteRecipient = Recipient;
            RewriteType = Type;
        }

        const TActorId& GetRecipientRewrite() const {
            return RewriteRecipient;
        }

        ui32 GetTypeRewrite() const {
            return RewriteType;
        }

        TActorId GetForwardOnNondeliveryRecipient() const {
            return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
        }

        IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, TEventFlags flags = 0, ui64 cookie = 0,
                     const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
            : Type(ev->Type())
            , Flags(flags)
            , Recipient(recipient)
            , Sender(sender)
            , Cookie(cookie)
            , TraceId(std::move(traceId))
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
            , SendTime(0)
#endif
            , Event(ev)
            , RewriteRecipient(Recipient)
            , RewriteType(Type)
        {
            if (forwardOnNondelivery)
                OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
        }

        IEventHandle(ui32 type,
                     TEventFlags flags,
                     const TActorId& recipient,
                     const TActorId& sender,
                     TIntrusivePtr<TEventSerializedData> buffer,
                     ui64 cookie,
                     const TActorId* forwardOnNondelivery = nullptr,
                     NWilson::TTraceId traceId = {})
            : Type(type)
            , Flags(flags)
            , Recipient(recipient)
            , Sender(sender)
            , Cookie(cookie)
            , TraceId(std::move(traceId))
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
            , SendTime(0)
#endif
            , Buffer(std::move(buffer))
            , RewriteRecipient(Recipient)
            , RewriteType(Type)
        {
            if (forwardOnNondelivery)
                OnNondeliveryHolder.Reset(new TOnNondelivery(*forwardOnNondelivery));
        }

        // Special ctor for events from interconnect.
        IEventHandle(const TActorId& session,
                     ui32 type,
                     TEventFlags flags,
                     const TActorId& recipient,
                     const TActorId& sender,
                     TIntrusivePtr<TEventSerializedData> buffer,
                     ui64 cookie,
                     TScopeId originScopeId,
                     NWilson::TTraceId traceId) noexcept
            : Type(type)
            , Flags(flags)
            , Recipient(recipient)
            , Sender(sender)
            , Cookie(cookie)
            , OriginScopeId(originScopeId)
            , TraceId(std::move(traceId))
            , InterconnectSession(session)
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
            , SendTime(0)
#endif
            , Buffer(std::move(buffer))
            , RewriteRecipient(Recipient)
            , RewriteType(Type)
        {
        }

        TIntrusivePtr<TEventSerializedData> GetChainBuffer();
        TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer();

        ui32 GetSize() const {
            if (Buffer) {
                return Buffer->GetSize();
            } else if (Event) {
                return Event->CalculateSerializedSize();
            } else {
                return 0;
            }
        }

        bool HasBuffer() const {
            return bool(Buffer);
        }

        bool HasEvent() const {
            return bool(Event);
        }

        IEventBase* GetBase() {
            if (!Event) {
                if (!Buffer)
                    return nullptr;
                else
                    ythrow TWithBackTrace<yexception>() << "don't know how to load the event from buffer";
            }

            return Event.Get();
        }

        TAutoPtr<IEventBase> ReleaseBase() {
            TAutoPtr<IEventBase> x = GetBase();
            Y_UNUSED(Event.Release());
            Buffer.Reset();
            return x;
        }

        TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
            if (Event)
                return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
            else
                return new IEventHandle(Type, Flags, dest, Sender, Buffer, Cookie, nullptr, std::move(TraceId));
        }

        TString GetTypeName() const;
        TString ToString() const;

        [[nodiscard]] static std::unique_ptr<IEventHandle> Forward(std::unique_ptr<IEventHandle>&& ev, TActorId recipient);
        [[nodiscard]] static std::unique_ptr<IEventHandle> ForwardOnNondelivery(std::unique_ptr<IEventHandle>&& ev, ui32 reason, bool unsure = false);

        [[nodiscard]] static TAutoPtr<IEventHandle> Forward(TAutoPtr<IEventHandle>&& ev, TActorId recipient) {
            return Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release();
        }

        [[nodiscard]] static THolder<IEventHandle> Forward(THolder<IEventHandle>&& ev, TActorId recipient) {
            return THolder(Forward(std::unique_ptr<IEventHandle>(ev.Release()), recipient).release());
        }

        [[nodiscard]] static TAutoPtr<IEventHandle> ForwardOnNondelivery(TAutoPtr<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
            return ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release();
        }

        [[nodiscard]] static THolder<IEventHandle> ForwardOnNondelivery(THolder<IEventHandle>&& ev, ui32 reason, bool unsure = false) {
            return THolder(ForwardOnNondelivery(std::unique_ptr<IEventHandle>(ev.Release()), reason, unsure).release());
        }

        template<typename T>
        static TAutoPtr<T> Release(TAutoPtr<IEventHandle>& ev) {
            return ev->Release<T>();
        }

        template<typename T>
        static TAutoPtr<T> Release(THolder<IEventHandle>& ev) {
            return ev->Release<T>();
        }

        template <typename TEv>
        inline TEv* StaticCastAsLocal() const noexcept { // blind cast
            return static_cast<TEv*>(Event.Get());
        }
    };

    template <typename TEventType>
    class TEventHandle: public IEventHandle {
        TEventHandle(); // we never made instance of TEventHandle
    public:
        TEventType* Get() {
            return IEventHandle::Get<TEventType>();
        }

        TAutoPtr<TEventType> Release() {
            return IEventHandle::Release<TEventType>();
        }
    };

    static_assert(sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle), "expect sizeof(TEventHandle<IEventBase>) == sizeof(IEventHandle)");

    template <typename TEventType, ui32 EventType0>
    class TEventBase: public IEventBase {
    public:
        static constexpr ui32 EventType = EventType0;
        ui32 Type() const override {
            return EventType0;
        }
        // still abstract

        typedef TEventHandle<TEventType> THandle;
        typedef TAutoPtr<THandle> TPtr;
    };

#define DEFINE_SIMPLE_LOCAL_EVENT(eventType, header)                    \
    TString ToStringHeader() const override {                           \
        return TString(header);                                         \
    }                                                                   \
    bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
        Y_ABORT("Local event " #eventType " is not serializable");       \
    }                                                                   \
    static IEventBase* Load(NActors::TEventSerializedData*) {           \
        Y_ABORT("Local event " #eventType " has no load method");        \
    }                                                                   \
    bool IsSerializable() const override {                              \
        return false;                                                   \
    }

#define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header)                 \
    TString ToStringHeader() const override {                           \
        return TString(header);                                         \
    }                                                                   \
    bool SerializeToArcadiaStream(NActors::TChunkSerializer*) const override { \
        return true;                                                    \
    }                                                                   \
    static IEventBase* Load(NActors::TEventSerializedData*) {           \
        return new eventType();                                         \
    }                                                                   \
    bool IsSerializable() const override {                              \
        return true;                                                    \
    }
}