diff options
author | vladimir <vladimir@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
commit | 3e7ff6e4ee637c04455854159e84850e613ebc16 (patch) | |
tree | 1ea1786a47f104a0657e0f935ce63dcaeec3fd26 /library | |
parent | dad82c0e0157ebad6bfd7cf0e5fb3c15c42922b3 (diff) | |
download | ydb-3e7ff6e4ee637c04455854159e84850e613ebc16.tar.gz |
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/messagebus/coreconn.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/coreconn.h | 14 | ||||
-rw-r--r-- | library/cpp/messagebus/locator.cpp | 72 | ||||
-rw-r--r-- | library/cpp/messagebus/message.cpp | 20 | ||||
-rw-r--r-- | library/cpp/messagebus/messqueue.cpp | 30 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 56 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.h | 102 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.cpp | 40 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.h | 14 | ||||
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.h | 32 | ||||
-rw-r--r-- | library/cpp/messagebus/session.cpp | 18 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/example.h | 30 | ||||
-rw-r--r-- | library/cpp/messagebus/test/perftest/perftest.cpp | 86 | ||||
-rw-r--r-- | library/cpp/messagebus/test/perftest/ya.make | 18 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/moduletest.h | 78 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 126 | ||||
-rw-r--r-- | library/cpp/messagebus/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/ybus.h | 38 |
18 files changed, 391 insertions, 391 deletions
diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp index d9411bb5db..7ea9b6cce7 100644 --- a/library/cpp/messagebus/coreconn.cpp +++ b/library/cpp/messagebus/coreconn.cpp @@ -12,7 +12,7 @@ namespace NBus { TBusInstant Now() { return millisec(); } - + EIpVersion MakeIpVersion(bool allowIpv4, bool allowIpv6) { if (allowIpv4) { if (allowIpv6) { @@ -23,8 +23,8 @@ namespace NBus { } else if (allowIpv6) { return EIP_VERSION_6; } - + ythrow yexception() << "Neither of IPv4/IPv6 is allowed."; } -} +} diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h index fca228d82e..36f6e90165 100644 --- a/library/cpp/messagebus/coreconn.h +++ b/library/cpp/messagebus/coreconn.h @@ -1,8 +1,8 @@ #pragma once -////////////////////////////////////////////////////////////// -/// \file -/// \brief Definitions for asynchonous connection queue +////////////////////////////////////////////////////////////// +/// \file +/// \brief Definitions for asynchonous connection queue #include "base.h" #include "event_loop.h" @@ -21,7 +21,7 @@ #include <util/string/util.h> #include <util/system/condvar.h> #include <util/system/mutex.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/thread/lfqueue.h> #include <deque> @@ -38,12 +38,12 @@ namespace NBus { class TBusConnection; class TBusConnectionFactory; class TBusServerFactory; - + using TBusConnectionList = TList<TBusConnection*>; - + /// @throw yexception EIpVersion MakeIpVersion(bool allowIpv4, bool allowIpv6); - + inline bool WouldBlock() { int syserr = LastSystemError(); return syserr == EAGAIN || syserr == EINPROGRESS || syserr == EWOULDBLOCK || syserr == EINTR; diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp index e38a35c426..0453454680 100644 --- a/library/cpp/messagebus/locator.cpp +++ b/library/cpp/messagebus/locator.cpp @@ -1,9 +1,9 @@ -//////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////// /// \file -/// \brief Implementation of locator service - +/// \brief Implementation of locator service + #include "locator.h" - + #include "ybus.h" #include <util/generic/hash_set.h> @@ -11,7 +11,7 @@ namespace NBus { using namespace NAddr; - + static TIpPort GetAddrPort(const IRemoteAddr& addr) { switch (addr.Addr()->sa_family) { case AF_INET: { @@ -84,8 +84,8 @@ namespace NBus { static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) { return (const sockaddr_in6*)a.Addr(); - } - + } + static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) { if (a1.Addr()->sa_family == a2.Addr()->sa_family) { if (a1.Addr()->sa_family == AF_INET) { @@ -95,13 +95,13 @@ namespace NBus { } } return false; - } + } TBusLocator::TBusLocator() : MyInterfaces(GetNetworkInterfaces()) { } - + bool TBusLocator::TItem::operator<(const TItem& y) const { const TItem& x = *this; @@ -114,7 +114,7 @@ namespace NBus { bool TBusLocator::TItem::operator==(const TItem& y) const { return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr; } - + TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr) : ServiceId(serviceId) , Start(start) @@ -122,7 +122,7 @@ namespace NBus { , Addr(addr) { } - + bool TBusLocator::IsLocal(const TNetAddr& addr) { for (const auto& myInterface : MyInterfaces) { if (IsAddressEqual(addr, *myInterface.Address)) { @@ -132,7 +132,7 @@ namespace NBus { return false; } - + TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) { const char* c = ServiceIdSet.insert(name).first->c_str(); return (ui64)c; @@ -140,7 +140,7 @@ namespace NBus { int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) { TGuard<TMutex> G(Lock); - + TServiceId serviceId = GetServiceId(service); for (size_t i = 0; i < starts.size(); ++i) { RegisterBreak(serviceId, starts[i], addr); @@ -152,7 +152,7 @@ namespace NBus { TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr)); TItems::const_iterator service_it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); - + THolder<TItem> left; THolder<TItem> right; if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) { @@ -174,11 +174,11 @@ namespace NBus { Items.insert(*right); NormalizeBreaks(serviceId); return 0; - } - + } + int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) { TGuard<TMutex> G(Lock); - + TServiceId serviceId = GetServiceId(service); return UnregisterBreak(serviceId, addr); } @@ -225,7 +225,7 @@ namespace NBus { TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr); Items.insert(item); } - + NormalizeBreaks(serviceId); return deleted; } @@ -238,7 +238,7 @@ namespace NBus { if (serviceId != Max<TServiceId>()) { last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr())); } - + --last; Y_ASSERT(Items.end() != last); Y_ASSERT(last->ServiceId == serviceId); @@ -251,10 +251,10 @@ namespace NBus { int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) { TGuard<TMutex> G(Lock); Y_VERIFY(addrs.empty(), "Non emtpy addresses"); - + TServiceId serviceId = GetServiceId(service); TItems::const_iterator it; - + for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr())); it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End; ++it) { @@ -273,27 +273,27 @@ namespace NBus { TServiceId serviceId = GetServiceId(service); TItems::const_iterator it; - + it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr())); - + if (it != Items.end()) { const TItem& item = *it; if (item.ServiceId == serviceId && item.Start <= key && key < item.End) { *addr = item.Addr; - + return 0; } } return -1; - } - + } + int TBusLocator::GetLocalPort(TBusService service) { TGuard<TMutex> G(Lock); TServiceId serviceId = GetServiceId(service); TItems::const_iterator it; int port = 0; - + for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) { const TItem& item = *it; if (item.ServiceId != serviceId) { @@ -357,8 +357,8 @@ namespace NBus { *isLocal = IsLocal(addr); } return 0; - } - + } + int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) { TGuard<TMutex> G(Lock); Y_VERIFY(keys.empty(), "Non empty keys"); @@ -376,8 +376,8 @@ namespace NBus { keys.push_back(std::make_pair(item.Start, item.End)); } return (int)keys.size(); - } - + } + int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) { TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws { @@ -387,13 +387,13 @@ namespace NBus { Register(service, start, end, addr); return 0; } - + int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) { TNetAddr addr(na, requireVersion, preferVersion); // throws Register(service, start, end, addr); return 0; - } - + } + int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) { TGuard<TMutex> G(Lock); @@ -412,7 +412,7 @@ namespace NBus { Y_FAIL("Overlap in registered keys with non-identical range"); } } - + Items.insert(itemToReg); return 0; } @@ -424,4 +424,4 @@ namespace NBus { return 0; } -} +} diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp index bfa7ed8e9b..e4d483901d 100644 --- a/library/cpp/messagebus/message.cpp +++ b/library/cpp/messagebus/message.cpp @@ -3,15 +3,15 @@ #include <util/random/random.h> #include <util/string/printf.h> -#include <util/system/atomic.h> - +#include <util/system/atomic.h> + #include <string.h> - + using namespace NBus; namespace NBus { using namespace NBus::NPrivate; - + TBusIdentity::TBusIdentity() : MessageId(0) , Size(0) @@ -73,7 +73,7 @@ namespace NBus { GetHeader()->Type = type; DoReset(); } - + TBusMessage::TBusMessage(ECreateUninitialized) //: TCtr("BusMessage") : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1) @@ -81,11 +81,11 @@ namespace NBus { , Data(nullptr) { } - + TString TBusMessage::Describe() const { return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type)); } - + TBusMessage::~TBusMessage() { #ifndef NDEBUG Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type)); @@ -94,7 +94,7 @@ namespace NBus { CheckClean(); #endif } - + void TBusMessage::DoReset() { GetHeader()->SendTime = 0; GetHeader()->Size = 0; @@ -125,7 +125,7 @@ namespace NBus { memcpy(this, data.data(), sizeof(TBusHeader)); return sizeof(TBusHeader); } - + /////////////////////////////////////////////////////// /// \brief Packs header to network order @@ -140,7 +140,7 @@ namespace NBus { data.Flags = GetHeader()->FlagsInternal; //data.LocalFlags = LocalFlags; } - + //////////////////////////////////////////////////////////// /// \brief set message identity from serialized form diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index 3474d62705..8040dbf69e 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -3,13 +3,13 @@ #include "remote_client_session.h" #include "remote_server_session.h" #include "ybus.h" - + #include <util/generic/singleton.h> using namespace NBus; using namespace NBus::NPrivate; using namespace NActor; - + TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) { return new TBusMessageQueue(config, executor, locator, name); } @@ -20,8 +20,8 @@ TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBus executorConfig.Name = name; TExecutorPtr executor = new TExecutor(executorConfig); return CreateMessageQueue(config, executor, locator, name); -} - +} + TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) { return CreateMessageQueue(config, new TBusLocator, name); } @@ -50,15 +50,15 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e , Locator(locator) , WorkQueue(executor) , Running(1) -{ +{ InitBusLwtrace(); InitNetworkSubSystem(); -} - -TBusMessageQueue::~TBusMessageQueue() { +} + +TBusMessageQueue::~TBusMessageQueue() { Stop(); -} - +} + void TBusMessageQueue::Stop() { if (!AtomicCas(&Running, 0, 1)) { ShutdownComplete.WaitI(); @@ -128,8 +128,8 @@ TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusCli TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name)); Add(session.Get()); return session.Get(); -} - +} + TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) { TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name)); try { @@ -140,7 +140,7 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB if (port == 0) { port = proto->GetPort(); } - + session->Listen(port, this); Add(session.Get()); @@ -164,8 +164,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) { TGuard<TMutex> scope(Lock); Sessions.push_back(session); -} - +} + void TBusMessageQueue::Remove(TBusSession* session) { TGuard<TMutex> scope(Lock); TList<TIntrusivePtr<TBusSessionImpl>>::iterator it = std::find(Sessions.begin(), Sessions.end(), session); diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..3abcbfc87b 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -1,5 +1,5 @@ #include "module.h" - + #include <library/cpp/messagebus/scheduler_actor.h> #include <library/cpp/messagebus/thread_extra.h> #include <library/cpp/messagebus/actor/actor.h> @@ -64,7 +64,7 @@ namespace NBus { namespace NPrivate { class TJobStorage { }; - + struct TModuleClientHandler : public IBusClientHandler { TModuleClientHandler(TBusModuleImpl* module) @@ -334,16 +334,16 @@ namespace NBus { ClearAllMessageStates(); } - + TNetAddr TBusJob::GetPeerAddrNetAddr() const { Y_VERIFY(!!OnMessageContext); return OnMessageContext.GetPeerAddrNetAddr(); } - + void TBusJob::CheckThreadCurrentJob() { Y_ASSERT(ThreadCurrentJob == this); } - + ///////////////////////////////////////////////////////// /// \brief Send messages in pending list @@ -405,17 +405,17 @@ namespace NBus { } return Pending.size() > 0; } - + bool TBusJob::AnyPendingToSend() { for (unsigned i = 0; i < Pending.size(); ++i) { if (Pending[i].Status == MESSAGE_DONT_ASK) { return true; } - } + } return false; - } - + } + bool TBusJob::IsDone() { bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); return r; @@ -427,7 +427,7 @@ namespace NBus { Handler = Handler(ModuleImpl->Module, this, Message); } - + bool TBusJob::CallJobHandler() { /// go on as far as we can go without waiting while (!IsDone()) { @@ -465,9 +465,9 @@ namespace NBus { TThreadCurrentJobGuard threadCurrentJobGuard(this); (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); - } + } } - + int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { /// find handler for given message and update it's status size_t i = 0; @@ -476,34 +476,34 @@ namespace NBus { if (call.Message == mess) { break; } - } + } /// if not found, report error if (i == Pending.size()) { Y_FAIL("must not happen"); } - + /// fill in response into job state TJobState& call = Pending[i]; call.Status = status; Y_ASSERT(call.Message == mess); call.Reply = reply; - + if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { call.NumRetries++; call.Status = MESSAGE_DONT_ASK; call.Message->Reset(); // generate new Id DoCallReplyHandler(call); return 0; - } + } /// call the handler if provided DoCallReplyHandler(call); - + /// move job state into the finished stack Finished.push_back(Pending[i]); Pending.erase(Pending.begin() + i); - + return 0; } @@ -511,21 +511,21 @@ namespace NBus { /// send message to any other session or application void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { CheckThreadCurrentJob(); - + SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); } void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { CheckThreadCurrentJob(); - + SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); } void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { CheckThreadCurrentJob(); - + SetJob(req.Get(), Runner); Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); } @@ -559,7 +559,7 @@ namespace NBus { Status = status; } - + void TBusJob::ClearState(TJobState& call) { TJobStateVec::iterator it; for (it = Finished.begin(); it != Finished.end(); ++it) { @@ -569,10 +569,10 @@ namespace NBus { Finished.erase(it); return; } - } + } Y_ASSERT(0); - } - + } + void TBusJob::ClearAllMessageStates() { ClearJobStateVector(&Finished); ClearJobStateVector(&Pending); @@ -586,7 +586,7 @@ namespace NBus { SleepUntil = Now() + milliSeconds; } - + TString TBusJob::GetStatus(unsigned flags) { TString strReturn; strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", @@ -624,8 +624,8 @@ namespace NBus { if (job) { job->Cancel(status); } - } - + } + TString TBusModuleImpl::GetStatus(unsigned flags) { Y_UNUSED(flags); TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index 8d1c4a5d52..e263d1b618 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -1,55 +1,55 @@ #pragma once -/////////////////////////////////////////////////////////////////////////// -/// \file -/// \brief Application interface for modules +/////////////////////////////////////////////////////////////////////////// +/// \file +/// \brief Application interface for modules -/// NBus::TBusModule provides foundation for implementation of asynchnous -/// modules that communicate with multiple external or local sessions +/// NBus::TBusModule provides foundation for implementation of asynchnous +/// modules that communicate with multiple external or local sessions /// NBus::TBusSession. /// To implement the module some virtual functions needs to be overridden: /// NBus::TBusModule::CreateExtSession() creates and registers an -/// external session that receives incoming messages as input for module +/// external session that receives incoming messages as input for module /// processing. /// When new incoming message arrives the new NBus::TBusJob is created. -/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state +/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state /// during processing of one incoming message. Default implementation of /// NBus::TBusJob will maintain all send and received messages during /// lifetime of this job. Each message, status and reply can be found -/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module -/// needs to maintain an additional information during lifetime of the job +/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module +/// needs to maintain an additional information during lifetime of the job /// you can derive your own class from NBus::TBusJob and override job /// factory method NBus::IJobFactory::CreateJobInstance() to create your instances. /// Processing of a given message starts with a call to NBus::TBusModule::Start() /// handler that should be overridden in the module implementation. Within -/// the callback handler module can perform any computation and access any -/// datastore tables that it needs. The handler can also access any module +/// the callback handler module can perform any computation and access any +/// datastore tables that it needs. The handler can also access any module /// variables. However, same handler can be called from multiple threads so, /// it is recommended that handler only access read-only module level variables. /// Handler should use NBus::TBusJob::Send() to send messages to other client -/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main +/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main /// job message. When handler is done, it returns the pointer to the next handler to call -/// when all pending messages have cleared. If handler -/// returns pointer to itself the module will reschedule execution of this handler -/// for a later time. This should be done in case NBus::TBusJob::Send() returns -/// error (not MESSAGE_OK) - +/// when all pending messages have cleared. If handler +/// returns pointer to itself the module will reschedule execution of this handler +/// for a later time. This should be done in case NBus::TBusJob::Send() returns +/// error (not MESSAGE_OK) + #include "startsession.h" #include <library/cpp/messagebus/ybus.h> - + #include <util/generic/noncopyable.h> #include <util/generic/object_counter.h> -namespace NBus { +namespace NBus { class TBusJob; class TBusModule; - + namespace NPrivate { struct TCallJobHandlerWorkItem; struct TBusModuleImpl; @@ -57,7 +57,7 @@ namespace NBus { struct TModuleClientHandler; struct TJobRunner; } - + class TJobHandler { protected: typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess); @@ -86,12 +86,12 @@ namespace NBus { return (b->*MyPtr)(job, mess); } }; - + typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply); - + //////////////////////////////////////////////////// /// \brief Pending message state - + struct TJobState { friend class TBusJob; friend class ::TCrawlerModule; @@ -131,9 +131,9 @@ namespace NBus { public: TString GetStatus(unsigned flags); }; - + using TJobStateVec = TVector<TJobState>; - + ///////////////////////////////////////////////////////// /// \brief Execution item = thread @@ -147,10 +147,10 @@ namespace NBus { public: /// given a module and starter message TBusJob(TBusModule* module, TBusMessage* message); - + /// destructor will free all the message that were send and received virtual ~TBusJob(); - + TBusMessage* GetMessage() const { return Message; } @@ -161,13 +161,13 @@ namespace NBus { /// If addr is set then use it as destination. void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr); void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0); - + void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr); void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session); /// send reply to the starter message virtual void SendReply(TBusMessageAutoPtr reply); - + /// set the flag to terminate job at the earliest convenience void Cancel(EMessageStatus status); @@ -179,7 +179,7 @@ namespace NBus { void PutState(const TJobState& state) { Finished.push_back(state); } - + public: /// retrieve all pending messages void GetPending(TJobStateVec* stateVec) { @@ -225,10 +225,10 @@ namespace NBus { } return static_cast<MessageType*>(call.Message); } - } + } return nullptr; - } - + } + /// helper function to find status for previously sent message template <class MessageType> EMessageStatus GetStatus(int* startFrom = nullptr) { @@ -240,10 +240,10 @@ namespace NBus { } return call.Status; } - } + } return MESSAGE_UNKNOWN; - } - + } + /// helper function to clear state of previosly sent messages template <class MessageType> void Clear() { @@ -256,24 +256,24 @@ namespace NBus { } else { ++i; } - } - } - + } + } + /// helper function to clear state in order to try again void ClearState(TJobState& state); - + /// clears all message states void ClearAllMessageStates(); /// returns true if job is done bool IsDone(); - + /// return human reabable status of this job virtual TString GetStatus(unsigned flags); /// set sleep time for job void Sleep(int milliSeconds); - + void CallJobHandlerOnly(); private: @@ -297,7 +297,7 @@ namespace NBus { TOnMessageContext OnMessageContext; // starter public: bool ReplySent; - + private: friend class TBusModule; friend struct NPrivate::TBusModuleImpl; @@ -312,7 +312,7 @@ namespace NBus { NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep }; - + //////////////////////////////////////////////////////////////////// /// \brief Classes to implement basic module functionality @@ -358,18 +358,18 @@ namespace NBus { friend class TBusJob; TObjectCounter<TBusModule> ObjectCounter; - + TIntrusivePtr<NPrivate::TBusModuleImpl> Impl; - + public: /// Each module should have a name which is used as protocol service TBusModule(const char* name); ~TBusModule() override; const char* GetName() const; - + void SetConfig(const TBusModuleConfig& config); - + /// get status of all jobs in flight TString GetStatus(unsigned flags = 0); @@ -380,7 +380,7 @@ namespace NBus { // this default implementation just creates TBusJob object TBusJob* CreateJobInstance(TBusMessage* message) override; - + EMessageStatus StartJob(TAutoPtr<TBusMessage> message); /// creates private sessions, calls CreateExtSession(), should be called before StartInput() @@ -391,7 +391,7 @@ namespace NBus { public: /// entry point into module, first function to call virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0; - + protected: /// override this function to create destination session virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0; diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp index 7c38801d62..8126e11530 100644 --- a/library/cpp/messagebus/oldmodule/startsession.cpp +++ b/library/cpp/messagebus/oldmodule/startsession.cpp @@ -1,19 +1,19 @@ -/////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////// /// \file -/// \brief Starter session implementation +/// \brief Starter session implementation -/// Starter session will generate emtpy message to insert -/// into local session that are registered under same protocol +/// Starter session will generate emtpy message to insert +/// into local session that are registered under same protocol -/// Starter (will one day) automatically adjust number -/// of message inflight to make sure that at least one of source -/// sessions within message queue is at the limit (bottle neck) - -/// Maximum number of messages that starter will instert into -/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight - -#include "startsession.h" +/// Starter (will one day) automatically adjust number +/// of message inflight to make sure that at least one of source +/// sessions within message queue is at the limit (bottle neck) +/// Maximum number of messages that starter will instert into +/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight + +#include "startsession.h" + #include "module.h" #include <library/cpp/messagebus/ybus.h> @@ -24,7 +24,7 @@ namespace NBus { pThis->Starter(); return nullptr; } - + TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config) : Module(module) , Config(config) @@ -33,11 +33,11 @@ namespace NBus { { StartThread.Start(); } - + TBusStarter::~TBusStarter() { Shutdown(); } - + void TBusStarter::Shutdown() { { TGuard<TMutex> g(ExitLock); @@ -46,20 +46,20 @@ namespace NBus { } StartThread.Join(); } - + void TBusStarter::Starter() { TGuard<TMutex> g(ExitLock); while (!Exiting) { TAutoPtr<TBusMessage> empty(new TBusMessage(0)); - + EMessageStatus status = Module->StartJob(empty); - + if (Config.SendTimeout > 0) { ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout)); } else { ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero()); } - } - } + } + } } diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h index 5e26e7e1e5..c6b407743d 100644 --- a/library/cpp/messagebus/oldmodule/startsession.h +++ b/library/cpp/messagebus/oldmodule/startsession.h @@ -1,12 +1,12 @@ #pragma once - + #include <library/cpp/messagebus/ybus.h> -#include <util/system/thread.h> - -namespace NBus { +#include <util/system/thread.h> + +namespace NBus { class TBusModule; - + class TBusStarter { private: TBusModule* Module; @@ -23,12 +23,12 @@ namespace NBus { TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) { return ""; } - + public: TBusStarter(TBusModule* module, const TBusSessionConfig& config); ~TBusStarter(); void Shutdown(); }; - + } diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h index 57b4267ea5..7e3a125868 100644 --- a/library/cpp/messagebus/protobuf/ybusbuf.h +++ b/library/cpp/messagebus/protobuf/ybusbuf.h @@ -1,24 +1,24 @@ #pragma once - + #include <library/cpp/messagebus/ybus.h> - + #include <google/protobuf/descriptor.h> #include <google/protobuf/message.h> - + #include <util/generic/cast.h> #include <util/generic/vector.h> #include <util/stream/mem.h> - + #include <array> -namespace NBus { +namespace NBus { using TBusBufferRecord = ::google::protobuf::Message; - + template <class TBufferMessage> class TBusBufferMessagePtr; template <class TBufferMessage> class TBusBufferMessageAutoPtr; - + class TBusBufferBase: public TBusMessage { public: TBusBufferBase(int type) @@ -43,7 +43,7 @@ namespace NBus { /// @param TBufferRecord is record described in .proto file with namespace /// @param MessageFile is offset for .proto file message ids - + /// \attention If you want one protocol NBus::TBusBufferProtocol to handle /// messageges described in different .proto files, make sure that they have /// unique values for MessageFile @@ -52,7 +52,7 @@ namespace NBus { class TBusBufferMessage: public TBusBufferBase { public: static const int MessageType = MType; - + typedef TBusBufferMessagePtr<TBusBufferMessage<TBufferRecord, MType>> TPtr; typedef TBusBufferMessageAutoPtr<TBusBufferMessage<TBufferRecord, MType>> TAutoPtr; @@ -88,7 +88,7 @@ namespace NBus { return new TBusBufferMessage<TBufferRecord, MessageType>(); } }; - + template <class TSelf, class TBufferMessage> class TBusBufferMessagePtrBase { public: @@ -101,7 +101,7 @@ namespace NBus { const TSelf* GetSelf() const { return static_cast<const TSelf*>(this); } - + public: RecordType* operator->() { Y_ASSERT(GetSelf()->Get()); @@ -119,7 +119,7 @@ namespace NBus { Y_ASSERT(GetSelf()->Get()); return GetSelf()->Get()->Record; } - + TBusHeader* GetHeader() { return GetSelf()->Get()->GetHeader(); } @@ -127,7 +127,7 @@ namespace NBus { return GetSelf()->Get()->GetHeader(); } }; - + template <class TBufferMessage> class TBusBufferMessagePtr: public TBusBufferMessagePtrBase<TBusBufferMessagePtr<TBufferMessage>, TBufferMessage> { protected: @@ -179,7 +179,7 @@ namespace NBus { : AutoPtr(message) { } - + TBufferMessage* Get() { return AutoPtr.Get(); } @@ -198,7 +198,7 @@ namespace NBus { return AutoPtr.Release(); } }; - + ///////////////////////////////////////////// /// \brief Generic protocol object for messages descibed with protobuf @@ -223,7 +223,7 @@ namespace NBus { void RegisterType(TAutoPtr<TBusBufferBase> mess); TArrayRef<TBusBufferBase* const> GetTypes() const; - + /// serialized protocol specific data into TBusData void Serialize(const TBusMessage* mess, TBuffer& data) override; diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp index 46a7ece6a8..142fba2c71 100644 --- a/library/cpp/messagebus/session.cpp +++ b/library/cpp/messagebus/session.cpp @@ -7,7 +7,7 @@ using namespace NBus; namespace NBus { TBusSession::TBusSession() { } - + //////////////////////////////////////////////////////////////////// /// \brief Adds peer of connection into connection list @@ -20,7 +20,7 @@ namespace NBus { case AF_INET: { return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr)); } - + case AF_INET6: { return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr)); } @@ -32,13 +32,13 @@ namespace NBus { bool operator<(const TNetAddr& a1, const TNetAddr& a2) { return CompareByHost(a1, a2) < 0; } - + size_t TBusSession::GetInFlight(const TNetAddr& addr) const { size_t r; GetInFlightBulk({addr}, MakeArrayRef(&r, 1)); return r; } - + size_t TBusSession::GetConnectSyscallsNumForTest(const TNetAddr& addr) const { size_t r; GetConnectSyscallsNumBulkForTest({addr}, MakeArrayRef(&r, 1)); @@ -107,16 +107,16 @@ namespace NBus { return -1; } } - + TBusService service = GetProto()->GetService(); return GetQueue()->GetLocator()->Register(service, hostName.data(), portNum, start, end, ipVersion); - } - + } + TBusSession::~TBusSession() { } -} - +} + TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue) { return queue->CreateSource(proto, handler, config); } diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h index 26b7475308..5aa9b53df0 100644 --- a/library/cpp/messagebus/test/helper/example.h +++ b/library/cpp/messagebus/test/helper/example.h @@ -1,10 +1,10 @@ #pragma once - + #include <library/cpp/testing/unittest/registar.h> #include "alloc_counter.h" #include "message_handler_error.h" - + #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/misc/test_sync.h> @@ -14,13 +14,13 @@ namespace NBus { namespace NTest { class TExampleRequest: public TBusMessage { friend class TExampleProtocol; - + private: TAllocCounter AllocCounter; public: TString Data; - + public: TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); @@ -28,10 +28,10 @@ namespace NBus { class TExampleResponse: public TBusMessage { friend class TExampleProtocol; - + private: TAllocCounter AllocCounter; - + public: TString Data; TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); @@ -47,11 +47,11 @@ namespace NBus { TAtomic StartCount; TExampleProtocol(int port = 0); - + ~TExampleProtocol() override; - + void Serialize(const TBusMessage* message, TBuffer& buffer) override; - + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; }; @@ -77,7 +77,7 @@ namespace NBus { ~TExampleClient() override; EMessageStatus SendMessage(const TNetAddr* addr = nullptr); - + void SendMessages(size_t count, const TNetAddr* addr = nullptr); void SendMessages(size_t count, const TNetAddr& addr); @@ -90,7 +90,7 @@ namespace NBus { void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; - + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; }; @@ -101,12 +101,12 @@ namespace NBus { bool AckMessageBeforeSendReply; TMaybe<size_t> DataSize; // Nothing means use request size bool ForgetRequest; - + TTestSync TestSync; TBusMessageQueuePtr Bus; TBusServerSessionPtr Session; - + public: TExampleServer( const char* name = "TExampleServer", @@ -115,7 +115,7 @@ namespace NBus { TExampleServer(unsigned port, const char* name = "TExampleServer"); ~TExampleServer() override; - + public: size_t GetInFlight() const; unsigned GetActualListenPort() const; @@ -127,6 +127,6 @@ namespace NBus { protected: void OnMessage(TOnMessageContext& mess) override; }; - + } } diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index 8489319278..6ca846099f 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -1,5 +1,5 @@ #include "simple_proto.h" - + #include <library/cpp/messagebus/test/perftest/messages.pb.h> #include <library/cpp/messagebus/text_utils.h> @@ -448,66 +448,66 @@ private: } }; -// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 - -using namespace std; -using namespace NBus; - +// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 + +using namespace std; +using namespace NBus; + static TNetworkAddress ParseNetworkAddress(const char* string) { TString Name; int Port; - + const char* port = strchr(string, ':'); - + if (port != nullptr) { Name.append(string, port - string); - Port = atoi(port + 1); + Port = atoi(port + 1); } else { Name.append(string); Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; - } - + } + return TNetworkAddress(Name, Port); } TVector<TNetAddr> ParseNodes(const TString nodes) { TVector<TNetAddr> r; - + TVector<TString> hosts; - + size_t numh = Split(nodes.data(), ",", hosts); - - for (int i = 0; i < int(numh); i++) { + + for (int i = 0; i < int(numh); i++) { const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data()); Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses"); r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); - } - + } + return r; -} - +} + TPerftestConfig::TPerftestConfig() { TBusSessionConfig defaultConfig; ServerPort = DEFAULT_PORT; - Delay = 0; // artificial delay inside server OnMessage() + Delay = 0; // artificial delay inside server OnMessage() MessageSize = 200; - Failure = 0.00; - Run = 60; // in seconds - Nodes = "localhost"; + Failure = 0.00; + Run = 60; // in seconds + Nodes = "localhost"; ServerUseModules = false; ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; UseCompression = false; Profile = false; WwwPort = 0; -} - +} + TPerftestConfig* TheConfig = new TPerftestConfig(); bool TheExit = false; TSystemEvent StopEvent; - + TSimpleSharedPtr<TPerftestServer> Server; TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; @@ -516,13 +516,13 @@ TMutex ClientsLock; void stopsignal(int /*sig*/) { fprintf(stderr, "\n-------------------- exiting ------------------\n"); - TheExit = true; + TheExit = true; StopEvent.Signal(); -} - -// -s <num> - start server on port <num> -// -c <node:port,node:port> - start client - +} + +// -s <num> - start server on port <num> +// -c <node:port,node:port> - start client + void TTestStats::PeriodicallyPrint() { SetCurrentThreadName("print-stats"); @@ -589,7 +589,7 @@ void TTestStats::PeriodicallyPrint() { int main(int argc, char* argv[]) { NLWTrace::StartLwtraceFromEnv(); - + /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); @@ -600,7 +600,7 @@ int main(int argc, char* argv[]) { SetAsyncSignalHandler(SIGUSR1, stopsignal); #endif signal(SIGPIPE, SIG_IGN); - + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); @@ -621,7 +621,7 @@ int main(int argc, char* argv[]) { opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); opts.AddHelpOption(); - + Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); @@ -631,9 +631,9 @@ int main(int argc, char* argv[]) { NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); - TheConfig->Print(); + TheConfig->Print(); Config.Print(); - + if (TheConfig->Profile) { BeginProfiling(); } @@ -642,7 +642,7 @@ int main(int argc, char* argv[]) { ServerAddresses = ParseNodes(TheConfig->Nodes); - if (TheConfig->ServerPort) { + if (TheConfig->ServerPort) { if (TheConfig->ServerUseModules) { ServerUsingModule = new TPerftestUsingModule(); www->RegisterModule(ServerUsingModule.Get()); @@ -650,7 +650,7 @@ int main(int argc, char* argv[]) { Server = new TPerftestServer(); www->RegisterServerSession(Server->Session); } - } + } TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; @@ -661,8 +661,8 @@ int main(int argc, char* argv[]) { futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back()))); www->RegisterClientSession(Clients.back()->Session); } - } - + } + futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats)))); THolder<TBusWwwHttpServer> wwwServer; @@ -709,5 +709,5 @@ int main(int argc, char* argv[]) { } Cerr << "***SUCCESS***\n"; - return 0; -} + return 0; +} diff --git a/library/cpp/messagebus/test/perftest/ya.make b/library/cpp/messagebus/test/perftest/ya.make index 24c2848ed5..0d4288cee9 100644 --- a/library/cpp/messagebus/test/perftest/ya.make +++ b/library/cpp/messagebus/test/perftest/ya.make @@ -1,8 +1,8 @@ PROGRAM(messagebus_perftest) - + OWNER(g:messagebus) -PEERDIR( +PEERDIR( library/cpp/deprecated/threadable library/cpp/execprofile library/cpp/getopt @@ -13,12 +13,12 @@ PEERDIR( library/cpp/messagebus/www library/cpp/sighandler library/cpp/threading/future -) - -SRCS( +) + +SRCS( messages.proto - perftest.cpp + perftest.cpp simple_proto.cpp -) - -END() +) + +END() diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h index d5da72c0cb..a128d3ab21 100644 --- a/library/cpp/messagebus/test/ut/moduletest.h +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -1,9 +1,9 @@ #pragma once - -/////////////////////////////////////////////////////////////////// -/// \file -/// \brief Example of using local session for communication. - + +/////////////////////////////////////////////////////////////////// +/// \file +/// \brief Example of using local session for communication. + #include <library/cpp/messagebus/test/helper/alloc_counter.h> #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> @@ -14,10 +14,10 @@ namespace NBus { namespace NTest { using namespace std; - + #define TYPE_HOSTINFOREQUEST 100 -#define TYPE_HOSTINFORESPONSE 101 - +#define TYPE_HOSTINFORESPONSE 101 + //////////////////////////////////////////////////////////////////// /// \brief DupDetect protocol that common between client and server //////////////////////////////////////////////////////////////////// @@ -32,11 +32,11 @@ namespace NBus { : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) { } - + ~THostInfoMessage() override { } }; - + //////////////////////////////////////////////////////////////////// /// \brief HostInfo reply class class THostInfoReply: public TBusMessage { @@ -49,11 +49,11 @@ namespace NBus { : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) { } - + ~THostInfoReply() override { } }; - + //////////////////////////////////////////////////////////////////// /// \brief HostInfo protocol that common between client and server class THostInfoProtocol: public TBusProtocol { @@ -67,7 +67,7 @@ namespace NBus { Y_UNUSED(data); Y_UNUSED(mess); } - + /// deserialized TBusData into new instance of the message TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { Y_UNUSED(payload); @@ -81,23 +81,23 @@ namespace NBus { } } }; - + ////////////////////////////////////////////////////////////// /// \brief HostInfo handler (should convert it to module too) struct THostInfoHandler: public TBusServerHandlerError { TBusServerSessionPtr Session; TBusServerSessionConfig HostInfoConfig; THostInfoProtocol HostInfoProto; - + THostInfoHandler(TBusMessageQueue* queue) { Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); } - + void OnMessage(TOnMessageContext& mess) override { usleep(10 * 1000); /// pretend we are doing something - + TAutoPtr<THostInfoReply> reply(new THostInfoReply()); - + mess.SendReplyMove(reply); } @@ -105,7 +105,7 @@ namespace NBus { return TNetAddr("localhost", Session->GetActualListenPort()); } }; - + ////////////////////////////////////////////////////////////// /// \brief DupDetect handler (should convert it to module too) struct TDupDetectHandler: public TBusClientHandlerError { @@ -114,34 +114,34 @@ namespace NBus { TBusClientSessionPtr DupDetect; TBusClientSessionConfig DupDetectConfig; TExampleProtocol DupDetectProto; - + int NumMessages; int NumReplies; - + TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) : ServerAddr(serverAddr) { DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); DupDetect->RegisterService("localhost"); } - + void Work() { NumMessages = 10; NumReplies = 0; - + for (int i = 0; i < NumMessages; i++) { TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); DupDetect->SendMessage(mess, &ServerAddr); } } - + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { Y_UNUSED(mess); Y_UNUSED(reply); NumReplies++; } }; - + ///////////////////////////////////////////////////////////////// /// \brief DupDetect module @@ -151,12 +151,12 @@ namespace NBus { TBusClientSessionPtr HostInfoClientSession; TBusClientSessionConfig HostInfoConfig; THostInfoProtocol HostInfoProto; - + TExampleProtocol DupDetectProto; TBusServerSessionConfig DupDetectConfig; - + TNetAddr ListenAddr; - + TDupDetectModule(const TNetAddr& hostInfoAddr) : TBusModule("DUPDETECTMODULE") , HostInfoAddr(hostInfoAddr) @@ -166,10 +166,10 @@ namespace NBus { bool Init(TBusMessageQueue* queue) { HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); HostInfoClientSession->RegisterService("localhost"); - + return TBusModule::CreatePrivateSessions(queue); } - + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); @@ -177,14 +177,14 @@ namespace NBus { return session; } - + /// entry point into module, first function to call TJobHandler Start(TBusJob* job, TBusMessage* mess) override { TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); Y_UNUSED(dmess); - + THostInfoMessage* hmess = new THostInfoMessage(); - + /// send message to imaginary hostinfo server job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); @@ -195,27 +195,27 @@ namespace NBus { TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); Y_UNUSED(dmess); - + THostInfoMessage* hmess = job->Get<THostInfoMessage>(); THostInfoReply* hreply = job->Get<THostInfoReply>(); EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); Y_ASSERT(hmess != nullptr); Y_ASSERT(hreply != nullptr); Y_ASSERT(hstatus == MESSAGE_OK); - + return TJobHandler(&TDupDetectModule::Finish); } /// last handler sends reply and returns NULL TJobHandler Finish(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - + TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); job->SendReply(reply); - + return nullptr; } }; - - } + + } } diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 9c21227e2b..61d3a465a7 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -1,134 +1,134 @@ -/////////////////////////////////////////////////////////////////// -/// \file +/////////////////////////////////////////////////////////////////// +/// \file /// \brief Example of reply-less communication -/// This example demostrates how asynchronous message passing library -/// can be used to send message and do not wait for reply back. -/// The usage of reply-less communication should be restricted to -/// low-throughput clients and high-throughput server to provide reasonable -/// utility. Removing replies from the communication removes any restriction -/// on how many message can be send to server and rougue clients may overwelm -/// server without thoughtput control. +/// This example demostrates how asynchronous message passing library +/// can be used to send message and do not wait for reply back. +/// The usage of reply-less communication should be restricted to +/// low-throughput clients and high-throughput server to provide reasonable +/// utility. Removing replies from the communication removes any restriction +/// on how many message can be send to server and rougue clients may overwelm +/// server without thoughtput control. -/// 1) To implement reply-less client \n +/// 1) To implement reply-less client \n /// Call NBus::TBusSession::AckMessage() /// from within NBus::IMessageHandler::OnSent() handler when message has /// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). -/// Discard identity for reply message. +/// Discard identity for reply message. -/// 2) To implement reply-less server \n +/// 2) To implement reply-less server \n -/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() +/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() /// handler when message has been received on server end. /// See example in NBus::NullServer::OnMessage(). -/// Discard identity for reply message. - +/// Discard identity for reply message. + #include <library/cpp/messagebus/test/helper/alloc_counter.h> #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/hanging_server.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> #include <library/cpp/messagebus/test/helper/wait_for.h> - + #include <library/cpp/messagebus/ybus.h> -using namespace std; +using namespace std; using namespace NBus; using namespace NBus::NPrivate; using namespace NBus::NTest; - -//////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////// -/// \brief Reply-less client and handler + +//////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////// +/// \brief Reply-less client and handler struct NullClient : TBusClientHandlerError { TNetAddr ServerAddr; TBusMessageQueuePtr Queue; TBusClientSessionPtr Session; TExampleProtocol Proto; - - /// constructor creates instances of protocol and session + + /// constructor creates instances of protocol and session NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) : ServerAddr(serverAddr) { UNIT_ASSERT(serverAddr.GetPort() > 0); - - /// create or get instance of message queue, need one per application + + /// create or get instance of message queue, need one per application Queue = CreateMessageQueue(); - - /// register source/client session + + /// register source/client session Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); /// register service, announce to clients via LocatorService Session->RegisterService("localhost"); - } - + } + ~NullClient() override { Session->Shutdown(); } - /// dispatch of requests is done here + /// dispatch of requests is done here void Work() { - int batch = 10; - + int batch = 10; + for (int i = 0; i < batch; i++) { TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); mess->Data = "TADA"; Session->SendMessageOneWay(mess, &ServerAddr); - } - } - + } + } + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { - } -}; - -///////////////////////////////////////////////////////////////////// -/// \brief Reply-less server and handler + } +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Reply-less server and handler class NullServer: public TBusServerHandlerError { -public: - /// session object to maintian +public: + /// session object to maintian TBusMessageQueuePtr Queue; TBusServerSessionPtr Session; TExampleProtocol Proto; - -public: + +public: TAtomic NumMessages; - - NullServer() { - NumMessages = 0; - - /// create or get instance of single message queue, need one for application + + NullServer() { + NumMessages = 0; + + /// create or get instance of single message queue, need one for application Queue = CreateMessageQueue(); - - /// register destination session + + /// register destination session TBusServerSessionConfig sessionConfig; Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); - } - + } + ~NullServer() override { Session->Shutdown(); } - /// when message comes do not send reply, just acknowledge + /// when message comes do not send reply, just acknowledge void OnMessage(TOnMessageContext& mess) override { TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); - + Y_ASSERT(fmess->Data == "TADA"); - - /// tell session to forget this message and never expect any reply + + /// tell session to forget this message and never expect any reply mess.ForgetRequest(); - + AtomicIncrement(NumMessages); - } - - /// this handler should not be called because this server does not send replies + } + + /// this handler should not be called because this server does not send replies void OnSent(TAutoPtr<TBusMessage> mess) override { Y_UNUSED(mess); Y_FAIL("This server does not sent replies"); } -}; - +}; + Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { Y_UNIT_TEST(Simple) { TObjectCountCheck objectCountCheck; diff --git a/library/cpp/messagebus/ya.make b/library/cpp/messagebus/ya.make index e13cf06dea..5d26086a73 100644 --- a/library/cpp/messagebus/ya.make +++ b/library/cpp/messagebus/ya.make @@ -49,7 +49,7 @@ SRCS( use_after_free_checker.cpp use_count_checker.cpp ybus.h -) +) PEERDIR( contrib/libs/sparsehash diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index de21ad8521..77839f0ef1 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -27,23 +27,23 @@ #include <util/generic/buffer.h> #include <util/generic/noncopyable.h> #include <util/generic/ptr.h> -#include <util/stream/input.h> -#include <util/system/atomic.h> +#include <util/stream/input.h> +#include <util/system/atomic.h> #include <util/system/condvar.h> #include <util/system/type_name.h> #include <util/system/event.h> #include <util/system/mutex.h> - -namespace NBus { + +namespace NBus { //////////////////////////////////////////////////////// /// \brief Common structure to store address information - + int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept; bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses ///////////////////////////////////////////////////////////////////////// /// \brief Handles routing and data encoding to/from wire - + /// Protocol is stateless threadsafe singleton object that /// encapsulates relationship between a message (TBusMessage) object /// and destination server. Protocol object is reponsible for serializing in-memory @@ -69,7 +69,7 @@ namespace NBus { TBusService GetService() const { return ServiceName.data(); } - + /// returns port number for destination session to open socket int GetPort() const { return ServicePort; @@ -81,18 +81,18 @@ namespace NBus { /// \brief serialized protocol specific data into TBusData /// \note buffer passed to the function (data) is not empty, use append functions virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0; - + /// deserialized TBusData into new instance of the message virtual TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) = 0; - + /// returns key for messages of this protocol virtual TBusKey GetKey(const TBusMessage*) { return YBUS_KEYMIN; } - + /// default implementation of routing policy to allow overrides virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr); - + /// codec for transport level compression virtual NCodecs::TCodecPtr GetTransportCodec(void) const { return NCodecs::ICodec::GetInstance("snappy"); @@ -101,7 +101,7 @@ namespace NBus { class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> { friend class TBusMessageQueue; - + public: TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session); ~TBusSyncSourceSession(); @@ -139,9 +139,9 @@ namespace NBus { TList<TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl>> Sessions; TSimpleIntrusivePtr<TBusLocator> Locator; NPrivate::TScheduler Scheduler; - + ::NActor::TExecutorPtr WorkQueue; - + TAtomic Running; TSystemEvent ShutdownComplete; @@ -151,12 +151,12 @@ namespace NBus { public: TString GetNameInternal() const; - + ~TBusMessageQueue(); void Stop(); bool IsRunning(); - + public: void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) { WorkQueue->EnqueueWork(w); @@ -180,11 +180,11 @@ namespace NBus { TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = ""); TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = ""); TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name = ""); - + private: void Destroy(TBusSession* session); void Destroy(TBusSyncClientSessionPtr session); - + public: void Schedule(NPrivate::IScheduleItemAutoPtr i); @@ -201,5 +201,5 @@ namespace NBus { TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = ""); TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = ""); TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = ""); - + } |