diff options
| author | vladimir <[email protected]> | 2022-02-10 16:50:29 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:29 +0300 | 
| commit | 4bac7bacd041dac72ece081598805d03d2e80a3e (patch) | |
| tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/messagebus/oldmodule | |
| parent | 3e7ff6e4ee637c04455854159e84850e613ebc16 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule')
| -rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 56 | ||||
| -rw-r--r-- | library/cpp/messagebus/oldmodule/module.h | 102 | ||||
| -rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.cpp | 40 | ||||
| -rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.h | 14 | 
4 files changed, 106 insertions, 106 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 3abcbfc87bc..24bd778799f 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -1,5 +1,5 @@  #include "module.h" -  +  #include <library/cpp/messagebus/scheduler_actor.h>  #include <library/cpp/messagebus/thread_extra.h>  #include <library/cpp/messagebus/actor/actor.h> @@ -64,7 +64,7 @@ namespace NBus {      namespace NPrivate {          class TJobStorage {          }; -  +          struct TModuleClientHandler             : public IBusClientHandler {              TModuleClientHandler(TBusModuleImpl* module) @@ -334,16 +334,16 @@ namespace NBus {          ClearAllMessageStates();      } -  +      TNetAddr TBusJob::GetPeerAddrNetAddr() const {          Y_VERIFY(!!OnMessageContext);          return OnMessageContext.GetPeerAddrNetAddr();      } -  +      void TBusJob::CheckThreadCurrentJob() {          Y_ASSERT(ThreadCurrentJob == this);      } -  +      /////////////////////////////////////////////////////////      /// \brief Send messages in pending list @@ -405,17 +405,17 @@ namespace NBus {          }          return Pending.size() > 0;      } -  +      bool TBusJob::AnyPendingToSend() {          for (unsigned i = 0; i < Pending.size(); ++i) {              if (Pending[i].Status == MESSAGE_DONT_ASK) {                  return true;              } -        }  +        }          return false; -    }  -  +    } +      bool TBusJob::IsDone() {          bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));          return r; @@ -427,7 +427,7 @@ namespace NBus {          Handler = Handler(ModuleImpl->Module, this, Message);      } -  +      bool TBusJob::CallJobHandler() {          /// go on as far as we can go without waiting          while (!IsDone()) { @@ -465,9 +465,9 @@ namespace NBus {              TThreadCurrentJobGuard threadCurrentJobGuard(this);              (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); -        }  +        }      } -  +      int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {          /// find handler for given message and update it's status          size_t i = 0; @@ -476,34 +476,34 @@ namespace NBus {              if (call.Message == mess) {                  break;              } -        }  +        }          /// if not found, report error          if (i == Pending.size()) {              Y_FAIL("must not happen");          } -  +          /// fill in response into job state          TJobState& call = Pending[i];          call.Status = status;          Y_ASSERT(call.Message == mess);          call.Reply = reply; -  +          if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {              call.NumRetries++;              call.Status = MESSAGE_DONT_ASK;              call.Message->Reset(); // generate new Id              DoCallReplyHandler(call);              return 0; -        }  +        }          /// call the handler if provided          DoCallReplyHandler(call); -  +          /// move job state into the finished stack          Finished.push_back(Pending[i]);          Pending.erase(Pending.begin() + i); -  +          return 0;      } @@ -511,21 +511,21 @@ namespace NBus {      /// send message to any other session or application      void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {          CheckThreadCurrentJob(); -  +          SetJob(mess.Get(), Runner);          Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));      }      void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {          CheckThreadCurrentJob(); -  +          SetJob(mess.Get(), Runner);          Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));      }      void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {          CheckThreadCurrentJob(); -  +          SetJob(req.Get(), Runner);          Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));      } @@ -559,7 +559,7 @@ namespace NBus {          Status = status;      } -  +      void TBusJob::ClearState(TJobState& call) {          TJobStateVec::iterator it;          for (it = Finished.begin(); it != Finished.end(); ++it) { @@ -569,10 +569,10 @@ namespace NBus {                  Finished.erase(it);                  return;              } -        }  +        }          Y_ASSERT(0); -    }  -  +    } +      void TBusJob::ClearAllMessageStates() {          ClearJobStateVector(&Finished);          ClearJobStateVector(&Pending); @@ -586,7 +586,7 @@ namespace NBus {          SleepUntil = Now() + milliSeconds;      } -  +      TString TBusJob::GetStatus(unsigned flags) {          TString strReturn;          strReturn += Sprintf("  job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", @@ -624,8 +624,8 @@ namespace NBus {          if (job) {              job->Cancel(status);          } -    }  -  +    } +      TString TBusModuleImpl::GetStatus(unsigned flags) {          Y_UNUSED(flags);          TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index e263d1b6182..8d1c4a5d52b 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -1,55 +1,55 @@  #pragma once -///////////////////////////////////////////////////////////////////////////  -/// \file  -/// \brief Application interface for modules  +/////////////////////////////////////////////////////////////////////////// +/// \file +/// \brief Application interface for modules -/// NBus::TBusModule provides foundation for implementation of asynchnous  -/// modules that communicate with multiple external or local sessions  +/// NBus::TBusModule provides foundation for implementation of asynchnous +/// modules that communicate with multiple external or local sessions  /// NBus::TBusSession.  /// To implement the module some virtual functions needs to be overridden:  /// NBus::TBusModule::CreateExtSession() creates and registers an -/// external session that receives incoming messages as input for module  +/// external session that receives incoming messages as input for module  /// processing.  /// When new incoming message arrives the new NBus::TBusJob is created. -/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state  +/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state  /// during processing of one incoming message. Default implementation of  /// NBus::TBusJob will maintain all send and received messages during  /// lifetime of this job. Each message, status and reply can be found -/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module  -/// needs to maintain an additional information during lifetime of the job  +/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module +/// needs to maintain an additional information during lifetime of the job  /// you can derive your own class from NBus::TBusJob and override job  /// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.  /// Processing of a given message starts with a call to NBus::TBusModule::Start()  /// handler that should be overridden in the module implementation. Within -/// the callback handler module can perform any computation and access any  -/// datastore tables that it needs. The handler can also access any module  +/// the callback handler module can perform any computation and access any +/// datastore tables that it needs. The handler can also access any module  /// variables. However, same handler can be called from multiple threads so,  /// it is recommended that handler only access read-only module level variables.  /// Handler should use NBus::TBusJob::Send() to send messages to other client -/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main  +/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main  /// job message. When handler is done, it returns the pointer to the next handler to call -/// when all pending messages have cleared. If handler  -/// returns pointer to itself the module will reschedule execution of this handler  -/// for a later time. This should be done in case NBus::TBusJob::Send() returns  -/// error (not MESSAGE_OK)  -  +/// when all pending messages have cleared. If handler +/// returns pointer to itself the module will reschedule execution of this handler +/// for a later time. This should be done in case NBus::TBusJob::Send() returns +/// error (not MESSAGE_OK) +  #include "startsession.h"  #include <library/cpp/messagebus/ybus.h> -  +  #include <util/generic/noncopyable.h>  #include <util/generic/object_counter.h> -namespace NBus {  +namespace NBus {      class TBusJob;      class TBusModule; -  +      namespace NPrivate {          struct TCallJobHandlerWorkItem;          struct TBusModuleImpl; @@ -57,7 +57,7 @@ namespace NBus {          struct TModuleClientHandler;          struct TJobRunner;      } -  +      class TJobHandler {      protected:          typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess); @@ -86,12 +86,12 @@ namespace NBus {              return (b->*MyPtr)(job, mess);          }      }; -  +      typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply); -  +      ////////////////////////////////////////////////////      /// \brief Pending message state -  +      struct TJobState {          friend class TBusJob;          friend class ::TCrawlerModule; @@ -131,9 +131,9 @@ namespace NBus {      public:          TString GetStatus(unsigned flags);      }; -  +      using TJobStateVec = TVector<TJobState>; -  +      /////////////////////////////////////////////////////////      /// \brief Execution item = thread @@ -147,10 +147,10 @@ namespace NBus {      public:          /// given a module and starter message          TBusJob(TBusModule* module, TBusMessage* message); -  +          /// destructor will free all the message that were send and received          virtual ~TBusJob(); -  +          TBusMessage* GetMessage() const {              return Message;          } @@ -161,13 +161,13 @@ namespace NBus {          /// If addr is set then use it as destination.          void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);          void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0); -  +          void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);          void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);          /// send reply to the starter message          virtual void SendReply(TBusMessageAutoPtr reply); -  +          /// set the flag to terminate job at the earliest convenience          void Cancel(EMessageStatus status); @@ -179,7 +179,7 @@ namespace NBus {          void PutState(const TJobState& state) {              Finished.push_back(state);          } -  +      public:          /// retrieve all pending messages          void GetPending(TJobStateVec* stateVec) { @@ -225,10 +225,10 @@ namespace NBus {                      }                      return static_cast<MessageType*>(call.Message);                  } -            }  +            }              return nullptr; -        }  -  +        } +          /// helper function to find status for previously sent message          template <class MessageType>          EMessageStatus GetStatus(int* startFrom = nullptr) { @@ -240,10 +240,10 @@ namespace NBus {                      }                      return call.Status;                  } -            }  +            }              return MESSAGE_UNKNOWN; -        }  -  +        } +          /// helper function to clear state of previosly sent messages          template <class MessageType>          void Clear() { @@ -256,24 +256,24 @@ namespace NBus {                  } else {                      ++i;                  } -            }  -        }  -  +            } +        } +          /// helper function to clear state in order to try again          void ClearState(TJobState& state); -  +          /// clears all message states          void ClearAllMessageStates();          /// returns true if job is done          bool IsDone(); -  +          /// return human reabable status of this job          virtual TString GetStatus(unsigned flags);          /// set sleep time for job          void Sleep(int milliSeconds); -  +          void CallJobHandlerOnly();      private: @@ -297,7 +297,7 @@ namespace NBus {          TOnMessageContext OnMessageContext; // starter      public:          bool ReplySent; -  +      private:          friend class TBusModule;          friend struct NPrivate::TBusModuleImpl; @@ -312,7 +312,7 @@ namespace NBus {          NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job          TBusInstant SleepUntil;               ///< time to wakeup, 0 if no sleep      }; -  +      ////////////////////////////////////////////////////////////////////      /// \brief Classes to implement basic module functionality @@ -358,18 +358,18 @@ namespace NBus {          friend class TBusJob;          TObjectCounter<TBusModule> ObjectCounter; -  +          TIntrusivePtr<NPrivate::TBusModuleImpl> Impl; -  +      public:          /// Each module should have a name which is used as protocol service          TBusModule(const char* name);          ~TBusModule() override;          const char* GetName() const; -  +          void SetConfig(const TBusModuleConfig& config); -  +          /// get status of all jobs in flight          TString GetStatus(unsigned flags = 0); @@ -380,7 +380,7 @@ namespace NBus {          // this default implementation just creates TBusJob object          TBusJob* CreateJobInstance(TBusMessage* message) override; -  +          EMessageStatus StartJob(TAutoPtr<TBusMessage> message);          /// creates private sessions, calls CreateExtSession(), should be called before StartInput() @@ -391,7 +391,7 @@ namespace NBus {      public:          /// entry point into module, first function to call          virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0; -  +      protected:          /// override this function to create destination session          virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0; diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp index 8126e115304..7c38801d626 100644 --- a/library/cpp/messagebus/oldmodule/startsession.cpp +++ b/library/cpp/messagebus/oldmodule/startsession.cpp @@ -1,19 +1,19 @@ -///////////////////////////////////////////////////////////  +///////////////////////////////////////////////////////////  /// \file -/// \brief Starter session implementation  +/// \brief Starter session implementation -/// Starter session will generate emtpy message to insert  -/// into local session that are registered under same protocol  +/// Starter session will generate emtpy message to insert +/// into local session that are registered under same protocol -/// Starter (will one day) automatically adjust number  -/// of message inflight to make sure that at least one of source  -/// sessions within message queue is at the limit (bottle neck)  +/// Starter (will one day) automatically adjust number +/// of message inflight to make sure that at least one of source +/// sessions within message queue is at the limit (bottle neck) + +/// Maximum number of messages that starter will instert into +/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight + +#include "startsession.h" -/// Maximum number of messages that starter will instert into  -/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight  -  -#include "startsession.h"  -   #include "module.h"  #include <library/cpp/messagebus/ybus.h> @@ -24,7 +24,7 @@ namespace NBus {          pThis->Starter();          return nullptr;      } -  +      TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)          : Module(module)          , Config(config) @@ -33,11 +33,11 @@ namespace NBus {      {          StartThread.Start();      } -  +      TBusStarter::~TBusStarter() {          Shutdown();      } -  +      void TBusStarter::Shutdown() {          {              TGuard<TMutex> g(ExitLock); @@ -46,20 +46,20 @@ namespace NBus {          }          StartThread.Join();      } -  +      void TBusStarter::Starter() {          TGuard<TMutex> g(ExitLock);          while (!Exiting) {              TAutoPtr<TBusMessage> empty(new TBusMessage(0)); -  +              EMessageStatus status = Module->StartJob(empty); -  +              if (Config.SendTimeout > 0) {                  ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));              } else {                  ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());              } -        }  -    }  +        } +    }  } diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h index c6b407743db..5e26e7e1e56 100644 --- a/library/cpp/messagebus/oldmodule/startsession.h +++ b/library/cpp/messagebus/oldmodule/startsession.h @@ -1,12 +1,12 @@  #pragma once -  +  #include <library/cpp/messagebus/ybus.h> -#include <util/system/thread.h>  -  -namespace NBus {  +#include <util/system/thread.h> + +namespace NBus {      class TBusModule; -  +      class TBusStarter {      private:          TBusModule* Module; @@ -23,12 +23,12 @@ namespace NBus {          TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) {              return "";          } -  +      public:          TBusStarter(TBusModule* module, const TBusSessionConfig& config);          ~TBusStarter();          void Shutdown();      }; -  +  }  | 
