diff options
author | vladimir <vladimir@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
commit | 3e7ff6e4ee637c04455854159e84850e613ebc16 (patch) | |
tree | 1ea1786a47f104a0657e0f935ce63dcaeec3fd26 /library/cpp/messagebus/oldmodule | |
parent | dad82c0e0157ebad6bfd7cf0e5fb3c15c42922b3 (diff) | |
download | ydb-3e7ff6e4ee637c04455854159e84850e613ebc16.tar.gz |
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule')
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 56 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.h | 102 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.cpp | 40 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.h | 14 |
4 files changed, 106 insertions, 106 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..3abcbfc87b 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -1,5 +1,5 @@ #include "module.h" - + #include <library/cpp/messagebus/scheduler_actor.h> #include <library/cpp/messagebus/thread_extra.h> #include <library/cpp/messagebus/actor/actor.h> @@ -64,7 +64,7 @@ namespace NBus { namespace NPrivate { class TJobStorage { }; - + struct TModuleClientHandler : public IBusClientHandler { TModuleClientHandler(TBusModuleImpl* module) @@ -334,16 +334,16 @@ namespace NBus { ClearAllMessageStates(); } - + TNetAddr TBusJob::GetPeerAddrNetAddr() const { Y_VERIFY(!!OnMessageContext); return OnMessageContext.GetPeerAddrNetAddr(); } - + void TBusJob::CheckThreadCurrentJob() { Y_ASSERT(ThreadCurrentJob == this); } - + ///////////////////////////////////////////////////////// /// \brief Send messages in pending list @@ -405,17 +405,17 @@ namespace NBus { } return Pending.size() > 0; } - + bool TBusJob::AnyPendingToSend() { for (unsigned i = 0; i < Pending.size(); ++i) { if (Pending[i].Status == MESSAGE_DONT_ASK) { return true; } - } + } return false; - } - + } + bool TBusJob::IsDone() { bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); return r; @@ -427,7 +427,7 @@ namespace NBus { Handler = Handler(ModuleImpl->Module, this, Message); } - + bool TBusJob::CallJobHandler() { /// go on as far as we can go without waiting while (!IsDone()) { @@ -465,9 +465,9 @@ namespace NBus { TThreadCurrentJobGuard threadCurrentJobGuard(this); (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); - } + } } - + int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { /// find handler for given message and update it's status size_t i = 0; @@ -476,34 +476,34 @@ namespace NBus { if (call.Message == mess) { break; } - } + } /// if not found, report error if (i == Pending.size()) { Y_FAIL("must not happen"); } - + /// fill in response into job state TJobState& call = Pending[i]; call.Status = status; Y_ASSERT(call.Message == mess); call.Reply = reply; - + if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { call.NumRetries++; call.Status = MESSAGE_DONT_ASK; call.Message->Reset(); // generate new Id DoCallReplyHandler(call); return 0; - } + } /// call the handler if provided DoCallReplyHandler(call); - + /// move job state into the finished stack Finished.push_back(Pending[i]); Pending.erase(Pending.begin() + i); - + return 0; } @@ -511,21 +511,21 @@ namespace NBus { /// send message to any other session or application void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { CheckThreadCurrentJob(); - + SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); } void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { CheckThreadCurrentJob(); - + SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); } void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { CheckThreadCurrentJob(); - + SetJob(req.Get(), Runner); Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); } @@ -559,7 +559,7 @@ namespace NBus { Status = status; } - + void TBusJob::ClearState(TJobState& call) { TJobStateVec::iterator it; for (it = Finished.begin(); it != Finished.end(); ++it) { @@ -569,10 +569,10 @@ namespace NBus { Finished.erase(it); return; } - } + } Y_ASSERT(0); - } - + } + void TBusJob::ClearAllMessageStates() { ClearJobStateVector(&Finished); ClearJobStateVector(&Pending); @@ -586,7 +586,7 @@ namespace NBus { SleepUntil = Now() + milliSeconds; } - + TString TBusJob::GetStatus(unsigned flags) { TString strReturn; strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", @@ -624,8 +624,8 @@ namespace NBus { if (job) { job->Cancel(status); } - } - + } + TString TBusModuleImpl::GetStatus(unsigned flags) { Y_UNUSED(flags); TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index 8d1c4a5d52..e263d1b618 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -1,55 +1,55 @@ #pragma once -/////////////////////////////////////////////////////////////////////////// -/// \file -/// \brief Application interface for modules +/////////////////////////////////////////////////////////////////////////// +/// \file +/// \brief Application interface for modules -/// NBus::TBusModule provides foundation for implementation of asynchnous -/// modules that communicate with multiple external or local sessions +/// NBus::TBusModule provides foundation for implementation of asynchnous +/// modules that communicate with multiple external or local sessions /// NBus::TBusSession. /// To implement the module some virtual functions needs to be overridden: /// NBus::TBusModule::CreateExtSession() creates and registers an -/// external session that receives incoming messages as input for module +/// external session that receives incoming messages as input for module /// processing. /// When new incoming message arrives the new NBus::TBusJob is created. -/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state +/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state /// during processing of one incoming message. Default implementation of /// NBus::TBusJob will maintain all send and received messages during /// lifetime of this job. Each message, status and reply can be found -/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module -/// needs to maintain an additional information during lifetime of the job +/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module +/// needs to maintain an additional information during lifetime of the job /// you can derive your own class from NBus::TBusJob and override job /// factory method NBus::IJobFactory::CreateJobInstance() to create your instances. /// Processing of a given message starts with a call to NBus::TBusModule::Start() /// handler that should be overridden in the module implementation. Within -/// the callback handler module can perform any computation and access any -/// datastore tables that it needs. The handler can also access any module +/// the callback handler module can perform any computation and access any +/// datastore tables that it needs. The handler can also access any module /// variables. However, same handler can be called from multiple threads so, /// it is recommended that handler only access read-only module level variables. /// Handler should use NBus::TBusJob::Send() to send messages to other client -/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main +/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main /// job message. When handler is done, it returns the pointer to the next handler to call -/// when all pending messages have cleared. If handler -/// returns pointer to itself the module will reschedule execution of this handler -/// for a later time. This should be done in case NBus::TBusJob::Send() returns -/// error (not MESSAGE_OK) - +/// when all pending messages have cleared. If handler +/// returns pointer to itself the module will reschedule execution of this handler +/// for a later time. This should be done in case NBus::TBusJob::Send() returns +/// error (not MESSAGE_OK) + #include "startsession.h" #include <library/cpp/messagebus/ybus.h> - + #include <util/generic/noncopyable.h> #include <util/generic/object_counter.h> -namespace NBus { +namespace NBus { class TBusJob; class TBusModule; - + namespace NPrivate { struct TCallJobHandlerWorkItem; struct TBusModuleImpl; @@ -57,7 +57,7 @@ namespace NBus { struct TModuleClientHandler; struct TJobRunner; } - + class TJobHandler { protected: typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess); @@ -86,12 +86,12 @@ namespace NBus { return (b->*MyPtr)(job, mess); } }; - + typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply); - + //////////////////////////////////////////////////// /// \brief Pending message state - + struct TJobState { friend class TBusJob; friend class ::TCrawlerModule; @@ -131,9 +131,9 @@ namespace NBus { public: TString GetStatus(unsigned flags); }; - + using TJobStateVec = TVector<TJobState>; - + ///////////////////////////////////////////////////////// /// \brief Execution item = thread @@ -147,10 +147,10 @@ namespace NBus { public: /// given a module and starter message TBusJob(TBusModule* module, TBusMessage* message); - + /// destructor will free all the message that were send and received virtual ~TBusJob(); - + TBusMessage* GetMessage() const { return Message; } @@ -161,13 +161,13 @@ namespace NBus { /// If addr is set then use it as destination. void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr); void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0); - + void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr); void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session); /// send reply to the starter message virtual void SendReply(TBusMessageAutoPtr reply); - + /// set the flag to terminate job at the earliest convenience void Cancel(EMessageStatus status); @@ -179,7 +179,7 @@ namespace NBus { void PutState(const TJobState& state) { Finished.push_back(state); } - + public: /// retrieve all pending messages void GetPending(TJobStateVec* stateVec) { @@ -225,10 +225,10 @@ namespace NBus { } return static_cast<MessageType*>(call.Message); } - } + } return nullptr; - } - + } + /// helper function to find status for previously sent message template <class MessageType> EMessageStatus GetStatus(int* startFrom = nullptr) { @@ -240,10 +240,10 @@ namespace NBus { } return call.Status; } - } + } return MESSAGE_UNKNOWN; - } - + } + /// helper function to clear state of previosly sent messages template <class MessageType> void Clear() { @@ -256,24 +256,24 @@ namespace NBus { } else { ++i; } - } - } - + } + } + /// helper function to clear state in order to try again void ClearState(TJobState& state); - + /// clears all message states void ClearAllMessageStates(); /// returns true if job is done bool IsDone(); - + /// return human reabable status of this job virtual TString GetStatus(unsigned flags); /// set sleep time for job void Sleep(int milliSeconds); - + void CallJobHandlerOnly(); private: @@ -297,7 +297,7 @@ namespace NBus { TOnMessageContext OnMessageContext; // starter public: bool ReplySent; - + private: friend class TBusModule; friend struct NPrivate::TBusModuleImpl; @@ -312,7 +312,7 @@ namespace NBus { NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep }; - + //////////////////////////////////////////////////////////////////// /// \brief Classes to implement basic module functionality @@ -358,18 +358,18 @@ namespace NBus { friend class TBusJob; TObjectCounter<TBusModule> ObjectCounter; - + TIntrusivePtr<NPrivate::TBusModuleImpl> Impl; - + public: /// Each module should have a name which is used as protocol service TBusModule(const char* name); ~TBusModule() override; const char* GetName() const; - + void SetConfig(const TBusModuleConfig& config); - + /// get status of all jobs in flight TString GetStatus(unsigned flags = 0); @@ -380,7 +380,7 @@ namespace NBus { // this default implementation just creates TBusJob object TBusJob* CreateJobInstance(TBusMessage* message) override; - + EMessageStatus StartJob(TAutoPtr<TBusMessage> message); /// creates private sessions, calls CreateExtSession(), should be called before StartInput() @@ -391,7 +391,7 @@ namespace NBus { public: /// entry point into module, first function to call virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0; - + protected: /// override this function to create destination session virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0; diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp index 7c38801d62..8126e11530 100644 --- a/library/cpp/messagebus/oldmodule/startsession.cpp +++ b/library/cpp/messagebus/oldmodule/startsession.cpp @@ -1,19 +1,19 @@ -/////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////// /// \file -/// \brief Starter session implementation +/// \brief Starter session implementation -/// Starter session will generate emtpy message to insert -/// into local session that are registered under same protocol +/// Starter session will generate emtpy message to insert +/// into local session that are registered under same protocol -/// Starter (will one day) automatically adjust number -/// of message inflight to make sure that at least one of source -/// sessions within message queue is at the limit (bottle neck) - -/// Maximum number of messages that starter will instert into -/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight - -#include "startsession.h" +/// Starter (will one day) automatically adjust number +/// of message inflight to make sure that at least one of source +/// sessions within message queue is at the limit (bottle neck) +/// Maximum number of messages that starter will instert into +/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight + +#include "startsession.h" + #include "module.h" #include <library/cpp/messagebus/ybus.h> @@ -24,7 +24,7 @@ namespace NBus { pThis->Starter(); return nullptr; } - + TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config) : Module(module) , Config(config) @@ -33,11 +33,11 @@ namespace NBus { { StartThread.Start(); } - + TBusStarter::~TBusStarter() { Shutdown(); } - + void TBusStarter::Shutdown() { { TGuard<TMutex> g(ExitLock); @@ -46,20 +46,20 @@ namespace NBus { } StartThread.Join(); } - + void TBusStarter::Starter() { TGuard<TMutex> g(ExitLock); while (!Exiting) { TAutoPtr<TBusMessage> empty(new TBusMessage(0)); - + EMessageStatus status = Module->StartJob(empty); - + if (Config.SendTimeout > 0) { ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout)); } else { ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero()); } - } - } + } + } } diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h index 5e26e7e1e5..c6b407743d 100644 --- a/library/cpp/messagebus/oldmodule/startsession.h +++ b/library/cpp/messagebus/oldmodule/startsession.h @@ -1,12 +1,12 @@ #pragma once - + #include <library/cpp/messagebus/ybus.h> -#include <util/system/thread.h> - -namespace NBus { +#include <util/system/thread.h> + +namespace NBus { class TBusModule; - + class TBusStarter { private: TBusModule* Module; @@ -23,12 +23,12 @@ namespace NBus { TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) { return ""; } - + public: TBusStarter(TBusModule* module, const TBusSessionConfig& config); ~TBusStarter(); void Shutdown(); }; - + } |