aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/oldmodule/module.cpp
blob: 4ec37a4d53f2d409ae556ef67c7d0df4fc446a30 (plain) (tree)
1
2
3
4
5
6
7
8
                   
 




                                                                

                                   



                               
 
           
                                 
 
                                   
 








                                                                   
 











                                             
     
                                                       
 
                                 
         
                     


     
                

                           
 



                                                        
             
 


                                                                                          
 
                                         
 




                                                        
 
                                                            
 
                                         
 
                                                          
 
                                    
 
                                 
 
                                   
 





                                                       
 



                         
 





                                                            
 


                                                         
 









                                                                     
 

                                                             
                                                                                                                  
 
                                 
 
                                             
                 
                                                             
             
 
                                                                                  
 
                                               
 
                                             
 


                                                                
 

                                           
 
                            
 
                                                                                
             
 

                                                                                
 

                                                   
 

                                                
 



                                                    
 


                                                                               
 


                                    
 





                                                                                                   
 



                                                                                          
 
                                                            
 






                                                                                   
 

                                                                               
 

                                                                                                            
 





                                                              
             


                                                                           
 




                                                                                       
 
                                                    
 
                                                                                     
 










                                                       
                 

                                                                                                             
 
                                                                                          
 


                                                                            
 
                                       
 


                                                                            
 


                                                
 



                                        
             
          
 
     
 
                                                            
     

                                                                      
 









                                                              
 

                                      
 
                                
 
                                                  
                                           
                                                     
 

                                           
 
                                                             
 









                                                                                         
 



                                                             
                        
                                                                                                                                                                                      
                 


                                                                      
 



                                                                                           
 

                                                    
                        
                                                   
                 
             
 








                                                                                                    
                    

                                                    
             
         
                                  
 



                                                        
         
                     
     

                                                                                                          
     

                                                                                             
 
                                                             
     
 



                                                     
 


                                       
 
                                                             
 

                                   
 



                              
 
                                                                                                          
                                                                                                            
 
                        
 

                                                                                                 
 
                                                                                   
         
     
 






                                                                                                 
         
 

                                      
         
 



                                           
 




                                                                                                                    
         
 
                                        
 

                                                  
 

                 


                                                                                                                        
 

                                                                                                                               
 
                                                                                                                                              
 

                                                                                                                             
 
                                                                                                          
 

                                                                                                                 
 
                                                                                             
 

                                                                                                                   
 


                                                                   
 
                                                                  

                              
 



                                                                  
 

                                                                 
 
                        
     
 







                                                                 
         
                    
     


                                           
 
                                           
 
                                                                                           
 
                                          
 






                                                                                         
                                                                                  





                                                               
     






                                                                                    
                                                      
                         
 
                                                                          
 



                                                                                             
     








                                                                                             
     


                                        
 


                                               
 


                                              
 
                               
 

                                             
 

                                                                
 
                                   
                                                                              
                                              
 





                                                                         
     
 
                                 
 
                    
 


                                                                  
 
       






                                                                                           


                                                                     
 

                                                      
 

                                                                 
 












                                                                                                                   
     













                                                                                                                   
     


                                                                                                       
     

                                                                                   
     



                                                        
 
 
                                                    




                                                                                      
                                                  
                                                                        


                                                                                              
                                                              
                                            
                                          

                                            
         
     
 
                                                             
 
 
                                                                                                
                          












                                                                                  
                                                      
               
                                              
 
                                                
     
                                                






                                                                                                
                                




                                        
 
                                                                    
                                                           
 
                                                                                       








                                                              
                                            


                                                                                           
                                             
                                                                                            
 
 
                                                                           
                                             
                                                                                     
                 
 


                                                                                      
                                                 
                                                                                     

                     


                                                                                         
#include "module.h"

#include <library/cpp/messagebus/scheduler_actor.h>
#include <library/cpp/messagebus/thread_extra.h>
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/queue_in_actor.h>
#include <library/cpp/messagebus/actor/what_thread_does.h>
#include <library/cpp/messagebus/actor/what_thread_does_guard.h>

#include <util/generic/singleton.h>
#include <util/string/printf.h>
#include <util/system/event.h>

using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;

namespace {
    Y_POD_STATIC_THREAD(TBusJob*)
    ThreadCurrentJob;

    struct TThreadCurrentJobGuard {
        TBusJob* Prev;

        TThreadCurrentJobGuard(TBusJob* job)
            : Prev(ThreadCurrentJob)
        {
            Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job);
            ThreadCurrentJob = job;
        }
        ~TThreadCurrentJobGuard() {
            ThreadCurrentJob = Prev;
        }
    };

    void ClearState(NBus::TJobState* state) {
        /// skip sendbacks handlers
        if (state->Message != state->Reply) {
            if (state->Message) {
                delete state->Message;
                state->Message = nullptr;
            }

            if (state->Reply) {
                delete state->Reply;
                state->Reply = nullptr;
            }
        }
    }

    void ClearJobStateVector(NBus::TJobStateVec* vec) {
        Y_ASSERT(vec);

        for (auto& call : *vec) {
            ClearState(&call);
        }

        vec->clear();
    }

}

namespace NBus {
    namespace NPrivate {
        class TJobStorage {
        };

        struct TModuleClientHandler
           : public IBusClientHandler {
            TModuleClientHandler(TBusModuleImpl* module)
                : Module(module)
            {
            }

            void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override;
            void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
            void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override;
            void OnClientConnectionEvent(const TClientConnectionEvent& event) override;

            TBusModuleImpl* const Module;
        };

        struct TModuleServerHandler
           : public IBusServerHandler {
            TModuleServerHandler(TBusModuleImpl* module)
                : Module(module)
            {
            }

            void OnMessage(TOnMessageContext& msg) override;

            TBusModuleImpl* const Module;
        };

        struct TBusModuleImpl: public TBusModuleInternal {
            TBusModule* const Module;

            TBusMessageQueue* Queue;

            TScheduler Scheduler;

            const char* const Name;

            typedef TList<TJobRunner*> TBusJobList;
            /// jobs currently in-flight on this module
            TBusJobList Jobs;
            /// module level mutex
            TMutex Lock;
            TCondVar ShutdownCondVar;
            TAtomic JobCount;

            enum EState {
                CREATED,
                RUNNING,
                STOPPED,
            };

            TAtomic State;
            TBusModuleConfig ModuleConfig;
            TBusServerSessionPtr ExternalSession;
            /// protocol for local proxy session
            THolder<IBusClientHandler> ModuleClientHandler;
            THolder<IBusServerHandler> ModuleServerHandler;
            TVector<TSimpleSharedPtr<TBusStarter>> Starters;

            // Sessions must be destroyed before
            // ModuleClientHandler / ModuleServerHandler
            TVector<TBusClientSessionPtr> ClientSessions;
            TVector<TBusServerSessionPtr> ServerSessions;

            TBusModuleImpl(TBusModule* module, const char* name)
                : Module(module)
                , Queue()
                , Name(name)
                , JobCount(0)
                , State(CREATED)
                , ExternalSession(nullptr)
                , ModuleClientHandler(new TModuleClientHandler(this))
                , ModuleServerHandler(new TModuleServerHandler(this))
            {
            }

            ~TBusModuleImpl() override {
                // Shutdown cannot be called from destructor,
                // because module has virtual methods.
                Y_ABORT_UNLESS(State != RUNNING, "if running, must explicitly call Shutdown() before destructor");

                Scheduler.Stop();

                while (!Jobs.empty()) {
                    DestroyJob(Jobs.front());
                }
                Y_ABORT_UNLESS(JobCount == 0, "state check");
            }

            void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&);

            void AddJob(TJobRunner* jobRunner);

            void DestroyJob(TJobRunner* job);

            /// terminate job on this message
            void CancelJob(TBusJob* job, EMessageStatus status);
            /// prints statuses of jobs
            TString GetStatus(unsigned flags);

            size_t Size() const {
                return AtomicGet(JobCount);
            }

            void Shutdown();

            TVector<TBusClientSessionPtr> GetClientSessionsInternal() override {
                return ClientSessions;
            }

            TVector<TBusServerSessionPtr> GetServerSessionsInternal() override {
                return ServerSessions;
            }

            TBusMessageQueue* GetQueue() override {
                return Queue;
            }

            TString GetNameInternal() override {
                return Name;
            }

            TString GetStatusSingleLine() override {
                TStringStream ss;
                ss << "jobs: " << Size();
                return ss.Str();
            }

            void OnClientConnectionEvent(const TClientConnectionEvent& event) {
                Module->OnClientConnectionEvent(event);
            }
        };

        struct TJobResponseMessage {
            TBusMessage* Request;
            TBusMessage* Response;
            EMessageStatus Status;

            TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status)
                : Request(request)
                , Response(response)
                , Status(status)
            {
            }
        };

        struct TJobRunner: public TAtomicRefCount<TJobRunner>,
                            public NActor::TActor<TJobRunner>,
                            public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
                            public TScheduleActor<TJobRunner> {
            THolder<TBusJob> Job;

            TList<TJobRunner*>::iterator JobStorageIterator;

            TJobRunner(TAutoPtr<TBusJob> job)
                : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor())
                , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler)
                , Job(job.Release())
                , JobStorageIterator()
            {
                Job->Runner = this;
            }

            ~TJobRunner() override {
                Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator());
            }

            void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) {
                Job->CallReplyHandler(message.Status, message.Request, message.Response);
            }

            void Destroy() {
                if (!!Job->OnMessageContext) {
                    if (!Job->ReplySent) {
                        Job->OnMessageContext.ForgetRequest();
                    }
                }
                Job->ModuleImpl->DestroyJob(this);
            }

            void Act(NActor::TDefaultTag) {
                if (JobStorageIterator == TList<TJobRunner*>::iterator()) {
                    return;
                }

                if (Job->SleepUntil != 0) {
                    if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) {
                        Destroy();
                        return;
                    }
                }

                TThreadCurrentJobGuard g(Job.Get());

                NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll();

                if (Alarm.FetchTask()) {
                    if (Job->AnyPendingToSend()) {
                        Y_ASSERT(Job->SleepUntil == 0);
                        Job->SendPending();
                        if (Job->AnyPendingToSend()) {
                        }
                    } else {
                        // regular alarm
                        Y_ASSERT(Job->Pending.empty());
                        Y_ASSERT(Job->SleepUntil != 0);
                        Job->SleepUntil = 0;
                    }
                }

                for (;;) {
                    if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) {
                        TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");

                        Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message);
                    }

                    if (Job->SleepUntil != 0) {
                        ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil));
                        return;
                    }

                    Job->SendPending();

                    if (Job->AnyPendingToSend()) {
                        ScheduleAt(TInstant::Now() + TDuration::Seconds(1));
                        return;
                    }

                    if (!Job->Pending.empty()) {
                        // waiting replies
                        return;
                    }

                    if (Job->IsDone()) {
                        Destroy();
                        return;
                    }
                }
            }
        };

    }

    static inline TJobRunner* GetJob(TBusMessage* message) {
        return (TJobRunner*)message->Data;
    }

    static inline void SetJob(TBusMessage* message, TJobRunner* job) {
        message->Data = job;
    }

    TBusJob::TBusJob(TBusModule* module, TBusMessage* message)
        : Status(MESSAGE_OK)
        , Runner()
        , Message(message)
        , ReplySent(false)
        , Module(module)
        , ModuleImpl(module->Impl.Get())
        , SleepUntil(0)
    {
        Handler = TJobHandler(&TBusModule::Start);
    }

    TBusJob::~TBusJob() {
        Y_ASSERT(Pending.size() == 0);
        //Y_ASSERT(SleepUntil == 0);

        ClearAllMessageStates();
    }

    TNetAddr TBusJob::GetPeerAddrNetAddr() const {
        Y_ABORT_UNLESS(!!OnMessageContext);
        return OnMessageContext.GetPeerAddrNetAddr();
    }

    void TBusJob::CheckThreadCurrentJob() {
        Y_ASSERT(ThreadCurrentJob == this);
    }

    /////////////////////////////////////////////////////////
    /// \brief Send messages in pending list

    /// If at least one message is gone return true
    /// If message has not been send, move it to Finished with appropriate error code
    bool TBusJob::SendPending() {
        // Iterator type must be size_t, not vector::iterator,
        // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector,
        // that in turn invalidates iterator.
        // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending`
        // (not erases, and not inserts) so iteration by index is valid.
        size_t it = 0;
        while (it != Pending.size()) {
            TJobState& call = Pending[it];

            if (call.Status == MESSAGE_DONT_ASK) {
                EMessageStatus getAddressStatus = MESSAGE_OK;
                TNetAddr addr;
                if (call.UseAddr) {
                    addr = call.Addr;
                } else {
                    getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr);
                }

                if (getAddressStatus == MESSAGE_OK) {
                    // hold extra reference for each request in flight
                    Runner->Ref();

                    if (call.OneWay) {
                        call.Status = call.Session->SendMessageOneWay(call.Message, &addr);
                    } else {
                        call.Status = call.Session->SendMessage(call.Message, &addr);
                    }

                    if (call.Status != MESSAGE_OK) {
                        Runner->UnRef();
                    }

                } else {
                    call.Status = getAddressStatus;
                }
            }

            if (call.Status == MESSAGE_OK) {
                ++it; // keep pending list until we get reply
            } else if (call.Status == MESSAGE_BUSY) {
                Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight");
            } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) {
                ++it; // try up to call.MaxRetries times to send message
                call.NumRetries++;
                DoCallReplyHandler(call);
                call.Status = MESSAGE_DONT_ASK;
                call.Message->Reset(); // generate new Id
            } else {
                Finished.push_back(call);
                DoCallReplyHandler(call);
                Pending.erase(Pending.begin() + it);
            }
        }
        return Pending.size() > 0;
    }

    bool TBusJob::AnyPendingToSend() {
        for (unsigned i = 0; i < Pending.size(); ++i) {
            if (Pending[i].Status == MESSAGE_DONT_ASK) {
                return true;
            }
        }

        return false;
    }

    bool TBusJob::IsDone() {
        bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
        return r;
    }

    void TBusJob::CallJobHandlerOnly() {
        TThreadCurrentJobGuard threadCurrentJobGuard(this);
        TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");

        Handler = Handler(ModuleImpl->Module, this, Message);
    }

    bool TBusJob::CallJobHandler() {
        /// go on as far as we can go without waiting
        while (!IsDone()) {
            /// call the handler
            CallJobHandlerOnly();

            /// quit if job is canceled
            if (Status != MESSAGE_OK) {
                break;
            }

            /// there are messages to send and wait for reply
            SendPending();

            if (!Pending.empty()) {
                break;
            }

            /// asked to sleep
            if (SleepUntil) {
                break;
            }
        }

        Y_ABORT_UNLESS(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent),
                 "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d",
                 Message->GetHeader()->Id, Message->GetHeader()->Type);

        return IsDone();
    }

    void TBusJob::DoCallReplyHandler(TJobState& call) {
        if (call.Handler) {
            TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)");

            TThreadCurrentJobGuard threadCurrentJobGuard(this);
            (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
        }
    }

    int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
        /// find handler for given message and update it's status
        size_t i = 0;
        for (; i < Pending.size(); ++i) {
            TJobState& call = Pending[i];
            if (call.Message == mess) {
                break;
            }
        }

        /// if not found, report error
        if (i == Pending.size()) {
            Y_FAIL("must not happen");
        }

        /// fill in response into job state
        TJobState& call = Pending[i];
        call.Status = status;
        Y_ASSERT(call.Message == mess);
        call.Reply = reply;

        if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
            call.NumRetries++;
            call.Status = MESSAGE_DONT_ASK;
            call.Message->Reset(); // generate new Id
            DoCallReplyHandler(call);
            return 0;
        }

        /// call the handler if provided
        DoCallReplyHandler(call);

        /// move job state into the finished stack
        Finished.push_back(Pending[i]);
        Pending.erase(Pending.begin() + i);

        return 0;
    }

    ///////////////////////////////////////////////////////////////
    /// send message to any other session or application
    void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
        CheckThreadCurrentJob();

        SetJob(mess.Get(), Runner);
        Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
    }

    void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
        CheckThreadCurrentJob();

        SetJob(mess.Get(), Runner);
        Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
    }

    void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
        CheckThreadCurrentJob();

        SetJob(req.Get(), Runner);
        Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
    }

    void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) {
        CheckThreadCurrentJob();

        SetJob(req.Get(), Runner);
        Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true));
    }

    ///////////////////////////////////////////////////////////////
    /// send reply to the starter message
    void TBusJob::SendReply(TBusMessageAutoPtr reply) {
        CheckThreadCurrentJob();

        Y_ABORT_UNLESS(!ReplySent, "cannot call SendReply twice");
        ReplySent = true;
        if (!OnMessageContext)
            return;

        EMessageStatus ok = OnMessageContext.SendReplyMove(reply);
        if (ok != MESSAGE_OK) {
            // TODO: count errors
        }
    }

    /// set the flag to terminate job at the earliest convenience
    void TBusJob::Cancel(EMessageStatus status) {
        CheckThreadCurrentJob();

        Status = status;
    }

    void TBusJob::ClearState(TJobState& call) {
        TJobStateVec::iterator it;
        for (it = Finished.begin(); it != Finished.end(); ++it) {
            TJobState& state = *it;
            if (&call == &state) {
                ::ClearState(&call);
                Finished.erase(it);
                return;
            }
        }
        Y_ASSERT(0);
    }

    void TBusJob::ClearAllMessageStates() {
        ClearJobStateVector(&Finished);
        ClearJobStateVector(&Pending);
    }

    void TBusJob::Sleep(int milliSeconds) {
        CheckThreadCurrentJob();

        Y_ABORT_UNLESS(Pending.empty(), "sleep is not allowed when there are pending job");
        Y_ABORT_UNLESS(SleepUntil == 0, "must not override sleep");

        SleepUntil = Now() + milliSeconds;
    }

    TString TBusJob::GetStatus(unsigned flags) {
        TString strReturn;
        strReturn += Sprintf("  job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
                             Message->GetHeader()->Id,
                             (int)Message->GetHeader()->Type,
                             (int)(Now() - Message->GetHeader()->SendTime) / 1000,
                             (int)Pending.size(),
                             (int)Finished.size(),
                             Status != MESSAGE_OK ? ToString(Status).data() : "");

        TJobStateVec::iterator it;
        for (it = Pending.begin(); it != Pending.end(); ++it) {
            TJobState& call = *it;
            strReturn += call.GetStatus(flags);
        }
        return strReturn;
    }

    TString TJobState::GetStatus(unsigned flags) {
        Y_UNUSED(flags);
        TString strReturn;
        strReturn += Sprintf("    pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n",
                             Message->GetHeader()->Id,
                             (int)Message->GetHeader()->Type,
                             Session->GetProto()->GetService(),
                             (int)(Now() - Message->GetHeader()->SendTime) / 1000,
                             ToString(Status).data());
        return strReturn;
    }

    //////////////////////////////////////////////////////////////////////

    void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) {
        TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob");
        if (job) {
            job->Cancel(status);
        }
    }

    TString TBusModuleImpl::GetStatus(unsigned flags) {
        Y_UNUSED(flags);
        TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
        TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size());
        for (auto job : Jobs) {
            //strReturn += job->Job->GetStatus(flags);
            Y_UNUSED(job);
            strReturn += "TODO\n";
        }
        return strReturn;
    }

    TBusModuleConfig::TBusModuleConfig()
        : StarterMaxInFlight(1000)
    {
    }

    TBusModuleConfig::TSecret::TSecret()
        : SchedulePeriod(TDuration::Seconds(1))
    {
    }

    TBusModule::TBusModule(const char* name)
        : Impl(new TBusModuleImpl(this, name))
    {
    }

    TBusModule::~TBusModule() {
    }

    const char* TBusModule::GetName() const {
        return Impl->Name;
    }

    void TBusModule::SetConfig(const TBusModuleConfig& config) {
        Impl->ModuleConfig = config;
    }

    bool TBusModule::StartInput() {
        Y_ABORT_UNLESS(Impl->State == TBusModuleImpl::CREATED, "state check");
        Y_ABORT_UNLESS(!!Impl->Queue, "state check");
        Impl->State = TBusModuleImpl::RUNNING;

        Y_ASSERT(!Impl->ExternalSession);
        TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue);
        if (extSession != nullptr) {
            Impl->ExternalSession = extSession;
        }

        return true;
    }

    bool TBusModule::Shutdown() {
        Impl->Shutdown();

        return true;
    }

    TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) {
        TBusJob* job = new TBusJob(this, message);
        return job;
    }

    /**
Example for external session creation:

TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
    TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig);
    session->RegisterService(hostname, begin, end);
    return session;
*/

    bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) {
        Impl->Queue = queue;
        return true;
    }

    int TBusModule::GetModuleSessionInFlight() const {
        return Impl->Size();
    }

    TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() {
        return Impl.Get();
    }

    TBusServerSessionPtr TBusModule::CreateDefaultDestination(
        TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) {
        TBusServerSessionConfig patchedConfig = config;
        patchedConfig.ExecuteOnMessageInWorkerPool = false;
        if (!patchedConfig.Name) {
            patchedConfig.Name = name;
        }
        if (!patchedConfig.Name) {
            patchedConfig.Name = Impl->Name;
        }
        TBusServerSessionPtr session =
            TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue);
        Impl->ServerSessions.push_back(session);
        return session;
    }

    TBusClientSessionPtr TBusModule::CreateDefaultSource(
        TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) {
        TBusClientSessionConfig patchedConfig = config;
        patchedConfig.ExecuteOnReplyInWorkerPool = false;
        if (!patchedConfig.Name) {
            patchedConfig.Name = name;
        }
        if (!patchedConfig.Name) {
            patchedConfig.Name = Impl->Name;
        }
        TBusClientSessionPtr session =
            TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue);
        Impl->ClientSessions.push_back(session);
        return session;
    }

    TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) {
        TBusStarter* session = new TBusStarter(this, config);
        Impl->Starters.push_back(session);
        return session;
    }

    void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) {
        Y_UNUSED(event);
    }

    TString TBusModule::GetStatus(unsigned flags) {
        TString strReturn = Sprintf("%s\n", Impl->Name);
        strReturn += Impl->GetStatus(flags);
        return strReturn;
    }

}

void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
    TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
    Jobs.push_back(jobRunner);
    jobRunner->JobStorageIterator = Jobs.end();
    --jobRunner->JobStorageIterator;
}

void TBusModuleImpl::DestroyJob(TJobRunner* job) {
    Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator());

    {
        TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob");
        int jobCount = AtomicDecrement(JobCount);
        Y_ABORT_UNLESS(jobCount >= 0, "decremented too much");
        Jobs.erase(job->JobStorageIterator);

        if (AtomicGet(State) == STOPPED) {
            if (jobCount == 0) {
                ShutdownCondVar.BroadCast();
            }
        }
    }

    job->JobStorageIterator = TList<TJobRunner*>::iterator();
}

void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
    TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
    Y_ABORT_UNLESS(!!msg);

    THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg)));
    jobRunner->Job->MessageHolder.Reset(msg0.Release());
    jobRunner->Job->OnMessageContext.Swap(context);
    SetJob(jobRunner->Job->Message, jobRunner.Get());

    AtomicIncrement(JobCount);

    AddJob(jobRunner.Get());

    jobRunner.Release()->Schedule();
}

void TBusModuleImpl::Shutdown() {
    if (AtomicGet(State) != TBusModuleImpl::RUNNING) {
        AtomicSet(State, TBusModuleImpl::STOPPED);
        return;
    }
    AtomicSet(State, TBusModuleImpl::STOPPED);

    for (auto& clientSession : ClientSessions) {
        clientSession->Shutdown();
    }
    for (auto& serverSession : ServerSessions) {
        serverSession->Shutdown();
    }

    for (size_t starter = 0; starter < Starters.size(); ++starter) {
        Starters[starter]->Shutdown();
    }

    {
        TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown");
        for (auto& Job : Jobs) {
            Job->Schedule();
        }

        while (!Jobs.empty()) {
            ShutdownCondVar.WaitI(Lock);
        }
    }
}

EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) {
    Y_ABORT_UNLESS(Impl->State == TBusModuleImpl::RUNNING);
    Y_ABORT_UNLESS(!!Impl->Queue);

    if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) {
        return MESSAGE_BUSY;
    }

    TOnMessageContext dummy;
    Impl->OnMessageReceived(message.Release(), dummy);

    return MESSAGE_OK;
}

void TModuleServerHandler::OnMessage(TOnMessageContext& msg) {
    Module->OnMessageReceived(nullptr, msg);
}

void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) {
    TJobRunner* job = GetJob(req.Get());
    Y_ASSERT(job);
    Y_ASSERT(job->Job->Message != req.Get());
    job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK));
    job->UnRef();
}

void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) {
    TJobRunner* job = GetJob(req.Get());
    Y_ASSERT(job);
    Y_ASSERT(job->Job->Message != req.Get());
    job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK));
    job->UnRef();
}

void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) {
    TJobRunner* job = GetJob(msg.Get());
    if (job) {
        Y_ASSERT(job->Job->Message != msg.Get());
        job->EnqueueAndSchedule(TJobResponseMessage(msg.Release(), nullptr, status));
        job->UnRef();
    }
}

void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) {
    Module->OnClientConnectionEvent(event);
}