aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/oldmodule/startsession.cpp
blob: 7c38801d626d44588d26177a75770e16c468f35a (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                                           
         
                                         
 
                                                              
 

                                                                
 



                                                                      
                   
                                        
 
                



                                                             
 






                                                                                 
 

                                 
 






                                       
 
                                 
                                   
                                                            
 
                                                            
 



                                                                                                                      


         
///////////////////////////////////////////////////////////
/// \file
/// \brief Starter session implementation

/// Starter session will generate emtpy message to insert
/// into local session that are registered under same protocol

/// Starter (will one day) automatically adjust number
/// of message inflight to make sure that at least one of source
/// sessions within message queue is at the limit (bottle neck)

/// Maximum number of messages that starter will instert into
/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight

#include "startsession.h"

#include "module.h"

#include <library/cpp/messagebus/ybus.h>

namespace NBus {
    void* TBusStarter::_starter(void* data) {
        TBusStarter* pThis = static_cast<TBusStarter*>(data);
        pThis->Starter();
        return nullptr;
    }

    TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)
        : Module(module)
        , Config(config)
        , StartThread(_starter, this)
        , Exiting(false)
    {
        StartThread.Start();
    }

    TBusStarter::~TBusStarter() {
        Shutdown();
    }

    void TBusStarter::Shutdown() {
        {
            TGuard<TMutex> g(ExitLock);
            Exiting = true;
            ExitSignal.Signal();
        }
        StartThread.Join();
    }

    void TBusStarter::Starter() {
        TGuard<TMutex> g(ExitLock);
        while (!Exiting) {
            TAutoPtr<TBusMessage> empty(new TBusMessage(0));

            EMessageStatus status = Module->StartJob(empty);

            if (Config.SendTimeout > 0) {
                ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));
            } else {
                ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());
            }
        }
    }

}