aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvladimir <vladimir@yandex-team.ru>2022-02-10 16:50:29 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:29 +0300
commit4bac7bacd041dac72ece081598805d03d2e80a3e (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8
parent3e7ff6e4ee637c04455854159e84850e613ebc16 (diff)
downloadydb-4bac7bacd041dac72ece081598805d03d2e80a3e.tar.gz
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--library/cpp/messagebus/coreconn.cpp6
-rw-r--r--library/cpp/messagebus/coreconn.h14
-rw-r--r--library/cpp/messagebus/locator.cpp72
-rw-r--r--library/cpp/messagebus/message.cpp20
-rw-r--r--library/cpp/messagebus/messqueue.cpp30
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp56
-rw-r--r--library/cpp/messagebus/oldmodule/module.h102
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.cpp40
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.h14
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.h32
-rw-r--r--library/cpp/messagebus/session.cpp18
-rw-r--r--library/cpp/messagebus/test/helper/example.h30
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp86
-rw-r--r--library/cpp/messagebus/test/perftest/ya.make18
-rw-r--r--library/cpp/messagebus/test/ut/moduletest.h78
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp126
-rw-r--r--library/cpp/messagebus/ya.make2
-rw-r--r--library/cpp/messagebus/ybus.h38
-rw-r--r--util/network/socket.cpp50
19 files changed, 416 insertions, 416 deletions
diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp
index 7ea9b6cce7..d9411bb5db 100644
--- a/library/cpp/messagebus/coreconn.cpp
+++ b/library/cpp/messagebus/coreconn.cpp
@@ -12,7 +12,7 @@ namespace NBus {
TBusInstant Now() {
return millisec();
}
-
+
EIpVersion MakeIpVersion(bool allowIpv4, bool allowIpv6) {
if (allowIpv4) {
if (allowIpv6) {
@@ -23,8 +23,8 @@ namespace NBus {
} else if (allowIpv6) {
return EIP_VERSION_6;
}
-
+
ythrow yexception() << "Neither of IPv4/IPv6 is allowed.";
}
-}
+}
diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h
index 36f6e90165..fca228d82e 100644
--- a/library/cpp/messagebus/coreconn.h
+++ b/library/cpp/messagebus/coreconn.h
@@ -1,8 +1,8 @@
#pragma once
-//////////////////////////////////////////////////////////////
-/// \file
-/// \brief Definitions for asynchonous connection queue
+//////////////////////////////////////////////////////////////
+/// \file
+/// \brief Definitions for asynchonous connection queue
#include "base.h"
#include "event_loop.h"
@@ -21,7 +21,7 @@
#include <util/string/util.h>
#include <util/system/condvar.h>
#include <util/system/mutex.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/thread/lfqueue.h>
#include <deque>
@@ -38,12 +38,12 @@ namespace NBus {
class TBusConnection;
class TBusConnectionFactory;
class TBusServerFactory;
-
+
using TBusConnectionList = TList<TBusConnection*>;
-
+
/// @throw yexception
EIpVersion MakeIpVersion(bool allowIpv4, bool allowIpv6);
-
+
inline bool WouldBlock() {
int syserr = LastSystemError();
return syserr == EAGAIN || syserr == EINPROGRESS || syserr == EWOULDBLOCK || syserr == EINTR;
diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp
index 0453454680..e38a35c426 100644
--- a/library/cpp/messagebus/locator.cpp
+++ b/library/cpp/messagebus/locator.cpp
@@ -1,9 +1,9 @@
-////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////
/// \file
-/// \brief Implementation of locator service
-
+/// \brief Implementation of locator service
+
#include "locator.h"
-
+
#include "ybus.h"
#include <util/generic/hash_set.h>
@@ -11,7 +11,7 @@
namespace NBus {
using namespace NAddr;
-
+
static TIpPort GetAddrPort(const IRemoteAddr& addr) {
switch (addr.Addr()->sa_family) {
case AF_INET: {
@@ -84,8 +84,8 @@ namespace NBus {
static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
return (const sockaddr_in6*)a.Addr();
- }
-
+ }
+
static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) {
if (a1.Addr()->sa_family == a2.Addr()->sa_family) {
if (a1.Addr()->sa_family == AF_INET) {
@@ -95,13 +95,13 @@ namespace NBus {
}
}
return false;
- }
+ }
TBusLocator::TBusLocator()
: MyInterfaces(GetNetworkInterfaces())
{
}
-
+
bool TBusLocator::TItem::operator<(const TItem& y) const {
const TItem& x = *this;
@@ -114,7 +114,7 @@ namespace NBus {
bool TBusLocator::TItem::operator==(const TItem& y) const {
return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr;
}
-
+
TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr)
: ServiceId(serviceId)
, Start(start)
@@ -122,7 +122,7 @@ namespace NBus {
, Addr(addr)
{
}
-
+
bool TBusLocator::IsLocal(const TNetAddr& addr) {
for (const auto& myInterface : MyInterfaces) {
if (IsAddressEqual(addr, *myInterface.Address)) {
@@ -132,7 +132,7 @@ namespace NBus {
return false;
}
-
+
TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) {
const char* c = ServiceIdSet.insert(name).first->c_str();
return (ui64)c;
@@ -140,7 +140,7 @@ namespace NBus {
int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
TGuard<TMutex> G(Lock);
-
+
TServiceId serviceId = GetServiceId(service);
for (size_t i = 0; i < starts.size(); ++i) {
RegisterBreak(serviceId, starts[i], addr);
@@ -152,7 +152,7 @@ namespace NBus {
TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr));
TItems::const_iterator service_it =
Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr()));
-
+
THolder<TItem> left;
THolder<TItem> right;
if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) {
@@ -174,11 +174,11 @@ namespace NBus {
Items.insert(*right);
NormalizeBreaks(serviceId);
return 0;
- }
-
+ }
+
int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) {
TGuard<TMutex> G(Lock);
-
+
TServiceId serviceId = GetServiceId(service);
return UnregisterBreak(serviceId, addr);
}
@@ -225,7 +225,7 @@ namespace NBus {
TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr);
Items.insert(item);
}
-
+
NormalizeBreaks(serviceId);
return deleted;
}
@@ -238,7 +238,7 @@ namespace NBus {
if (serviceId != Max<TServiceId>()) {
last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
}
-
+
--last;
Y_ASSERT(Items.end() != last);
Y_ASSERT(last->ServiceId == serviceId);
@@ -251,10 +251,10 @@ namespace NBus {
int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) {
TGuard<TMutex> G(Lock);
Y_VERIFY(addrs.empty(), "Non emtpy addresses");
-
+
TServiceId serviceId = GetServiceId(service);
TItems::const_iterator it;
-
+
for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End;
++it) {
@@ -273,27 +273,27 @@ namespace NBus {
TServiceId serviceId = GetServiceId(service);
TItems::const_iterator it;
-
+
it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
-
+
if (it != Items.end()) {
const TItem& item = *it;
if (item.ServiceId == serviceId && item.Start <= key && key < item.End) {
*addr = item.Addr;
-
+
return 0;
}
}
return -1;
- }
-
+ }
+
int TBusLocator::GetLocalPort(TBusService service) {
TGuard<TMutex> G(Lock);
TServiceId serviceId = GetServiceId(service);
TItems::const_iterator it;
int port = 0;
-
+
for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
const TItem& item = *it;
if (item.ServiceId != serviceId) {
@@ -357,8 +357,8 @@ namespace NBus {
*isLocal = IsLocal(addr);
}
return 0;
- }
-
+ }
+
int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
TGuard<TMutex> G(Lock);
Y_VERIFY(keys.empty(), "Non empty keys");
@@ -376,8 +376,8 @@ namespace NBus {
keys.push_back(std::make_pair(item.Start, item.End));
}
return (int)keys.size();
- }
-
+ }
+
int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws
{
@@ -387,13 +387,13 @@ namespace NBus {
Register(service, start, end, addr);
return 0;
}
-
+
int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
TNetAddr addr(na, requireVersion, preferVersion); // throws
Register(service, start, end, addr);
return 0;
- }
-
+ }
+
int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) {
TGuard<TMutex> G(Lock);
@@ -412,7 +412,7 @@ namespace NBus {
Y_FAIL("Overlap in registered keys with non-identical range");
}
}
-
+
Items.insert(itemToReg);
return 0;
}
@@ -424,4 +424,4 @@ namespace NBus {
return 0;
}
-}
+}
diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp
index e4d483901d..bfa7ed8e9b 100644
--- a/library/cpp/messagebus/message.cpp
+++ b/library/cpp/messagebus/message.cpp
@@ -3,15 +3,15 @@
#include <util/random/random.h>
#include <util/string/printf.h>
-#include <util/system/atomic.h>
-
+#include <util/system/atomic.h>
+
#include <string.h>
-
+
using namespace NBus;
namespace NBus {
using namespace NBus::NPrivate;
-
+
TBusIdentity::TBusIdentity()
: MessageId(0)
, Size(0)
@@ -73,7 +73,7 @@ namespace NBus {
GetHeader()->Type = type;
DoReset();
}
-
+
TBusMessage::TBusMessage(ECreateUninitialized)
//: TCtr("BusMessage")
: TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
@@ -81,11 +81,11 @@ namespace NBus {
, Data(nullptr)
{
}
-
+
TString TBusMessage::Describe() const {
return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type));
}
-
+
TBusMessage::~TBusMessage() {
#ifndef NDEBUG
Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
@@ -94,7 +94,7 @@ namespace NBus {
CheckClean();
#endif
}
-
+
void TBusMessage::DoReset() {
GetHeader()->SendTime = 0;
GetHeader()->Size = 0;
@@ -125,7 +125,7 @@ namespace NBus {
memcpy(this, data.data(), sizeof(TBusHeader));
return sizeof(TBusHeader);
}
-
+
///////////////////////////////////////////////////////
/// \brief Packs header to network order
@@ -140,7 +140,7 @@ namespace NBus {
data.Flags = GetHeader()->FlagsInternal;
//data.LocalFlags = LocalFlags;
}
-
+
////////////////////////////////////////////////////////////
/// \brief set message identity from serialized form
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 8040dbf69e..3474d62705 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -3,13 +3,13 @@
#include "remote_client_session.h"
#include "remote_server_session.h"
#include "ybus.h"
-
+
#include <util/generic/singleton.h>
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NActor;
-
+
TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) {
return new TBusMessageQueue(config, executor, locator, name);
}
@@ -20,8 +20,8 @@ TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBus
executorConfig.Name = name;
TExecutorPtr executor = new TExecutor(executorConfig);
return CreateMessageQueue(config, executor, locator, name);
-}
-
+}
+
TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) {
return CreateMessageQueue(config, new TBusLocator, name);
}
@@ -50,15 +50,15 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e
, Locator(locator)
, WorkQueue(executor)
, Running(1)
-{
+{
InitBusLwtrace();
InitNetworkSubSystem();
-}
-
-TBusMessageQueue::~TBusMessageQueue() {
+}
+
+TBusMessageQueue::~TBusMessageQueue() {
Stop();
-}
-
+}
+
void TBusMessageQueue::Stop() {
if (!AtomicCas(&Running, 0, 1)) {
ShutdownComplete.WaitI();
@@ -128,8 +128,8 @@ TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusCli
TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
Add(session.Get());
return session.Get();
-}
-
+}
+
TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name));
try {
@@ -140,7 +140,7 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
if (port == 0) {
port = proto->GetPort();
}
-
+
session->Listen(port, this);
Add(session.Get());
@@ -164,8 +164,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {
TGuard<TMutex> scope(Lock);
Sessions.push_back(session);
-}
-
+}
+
void TBusMessageQueue::Remove(TBusSession* session) {
TGuard<TMutex> scope(Lock);
TList<TIntrusivePtr<TBusSessionImpl>>::iterator it = std::find(Sessions.begin(), Sessions.end(), session);
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 3abcbfc87b..24bd778799 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -1,5 +1,5 @@
#include "module.h"
-
+
#include <library/cpp/messagebus/scheduler_actor.h>
#include <library/cpp/messagebus/thread_extra.h>
#include <library/cpp/messagebus/actor/actor.h>
@@ -64,7 +64,7 @@ namespace NBus {
namespace NPrivate {
class TJobStorage {
};
-
+
struct TModuleClientHandler
: public IBusClientHandler {
TModuleClientHandler(TBusModuleImpl* module)
@@ -334,16 +334,16 @@ namespace NBus {
ClearAllMessageStates();
}
-
+
TNetAddr TBusJob::GetPeerAddrNetAddr() const {
Y_VERIFY(!!OnMessageContext);
return OnMessageContext.GetPeerAddrNetAddr();
}
-
+
void TBusJob::CheckThreadCurrentJob() {
Y_ASSERT(ThreadCurrentJob == this);
}
-
+
/////////////////////////////////////////////////////////
/// \brief Send messages in pending list
@@ -405,17 +405,17 @@ namespace NBus {
}
return Pending.size() > 0;
}
-
+
bool TBusJob::AnyPendingToSend() {
for (unsigned i = 0; i < Pending.size(); ++i) {
if (Pending[i].Status == MESSAGE_DONT_ASK) {
return true;
}
- }
+ }
return false;
- }
-
+ }
+
bool TBusJob::IsDone() {
bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
return r;
@@ -427,7 +427,7 @@ namespace NBus {
Handler = Handler(ModuleImpl->Module, this, Message);
}
-
+
bool TBusJob::CallJobHandler() {
/// go on as far as we can go without waiting
while (!IsDone()) {
@@ -465,9 +465,9 @@ namespace NBus {
TThreadCurrentJobGuard threadCurrentJobGuard(this);
(Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
- }
+ }
}
-
+
int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
/// find handler for given message and update it's status
size_t i = 0;
@@ -476,34 +476,34 @@ namespace NBus {
if (call.Message == mess) {
break;
}
- }
+ }
/// if not found, report error
if (i == Pending.size()) {
Y_FAIL("must not happen");
}
-
+
/// fill in response into job state
TJobState& call = Pending[i];
call.Status = status;
Y_ASSERT(call.Message == mess);
call.Reply = reply;
-
+
if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
call.NumRetries++;
call.Status = MESSAGE_DONT_ASK;
call.Message->Reset(); // generate new Id
DoCallReplyHandler(call);
return 0;
- }
+ }
/// call the handler if provided
DoCallReplyHandler(call);
-
+
/// move job state into the finished stack
Finished.push_back(Pending[i]);
Pending.erase(Pending.begin() + i);
-
+
return 0;
}
@@ -511,21 +511,21 @@ namespace NBus {
/// send message to any other session or application
void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
CheckThreadCurrentJob();
-
+
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
}
void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
CheckThreadCurrentJob();
-
+
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
}
void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
CheckThreadCurrentJob();
-
+
SetJob(req.Get(), Runner);
Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
}
@@ -559,7 +559,7 @@ namespace NBus {
Status = status;
}
-
+
void TBusJob::ClearState(TJobState& call) {
TJobStateVec::iterator it;
for (it = Finished.begin(); it != Finished.end(); ++it) {
@@ -569,10 +569,10 @@ namespace NBus {
Finished.erase(it);
return;
}
- }
+ }
Y_ASSERT(0);
- }
-
+ }
+
void TBusJob::ClearAllMessageStates() {
ClearJobStateVector(&Finished);
ClearJobStateVector(&Pending);
@@ -586,7 +586,7 @@ namespace NBus {
SleepUntil = Now() + milliSeconds;
}
-
+
TString TBusJob::GetStatus(unsigned flags) {
TString strReturn;
strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
@@ -624,8 +624,8 @@ namespace NBus {
if (job) {
job->Cancel(status);
}
- }
-
+ }
+
TString TBusModuleImpl::GetStatus(unsigned flags) {
Y_UNUSED(flags);
TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index e263d1b618..8d1c4a5d52 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -1,55 +1,55 @@
#pragma once
-///////////////////////////////////////////////////////////////////////////
-/// \file
-/// \brief Application interface for modules
+///////////////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Application interface for modules
-/// NBus::TBusModule provides foundation for implementation of asynchnous
-/// modules that communicate with multiple external or local sessions
+/// NBus::TBusModule provides foundation for implementation of asynchnous
+/// modules that communicate with multiple external or local sessions
/// NBus::TBusSession.
/// To implement the module some virtual functions needs to be overridden:
/// NBus::TBusModule::CreateExtSession() creates and registers an
-/// external session that receives incoming messages as input for module
+/// external session that receives incoming messages as input for module
/// processing.
/// When new incoming message arrives the new NBus::TBusJob is created.
-/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
+/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
/// during processing of one incoming message. Default implementation of
/// NBus::TBusJob will maintain all send and received messages during
/// lifetime of this job. Each message, status and reply can be found
-/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
-/// needs to maintain an additional information during lifetime of the job
+/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
+/// needs to maintain an additional information during lifetime of the job
/// you can derive your own class from NBus::TBusJob and override job
/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
/// Processing of a given message starts with a call to NBus::TBusModule::Start()
/// handler that should be overridden in the module implementation. Within
-/// the callback handler module can perform any computation and access any
-/// datastore tables that it needs. The handler can also access any module
+/// the callback handler module can perform any computation and access any
+/// datastore tables that it needs. The handler can also access any module
/// variables. However, same handler can be called from multiple threads so,
/// it is recommended that handler only access read-only module level variables.
/// Handler should use NBus::TBusJob::Send() to send messages to other client
-/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
+/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
/// job message. When handler is done, it returns the pointer to the next handler to call
-/// when all pending messages have cleared. If handler
-/// returns pointer to itself the module will reschedule execution of this handler
-/// for a later time. This should be done in case NBus::TBusJob::Send() returns
-/// error (not MESSAGE_OK)
-
+/// when all pending messages have cleared. If handler
+/// returns pointer to itself the module will reschedule execution of this handler
+/// for a later time. This should be done in case NBus::TBusJob::Send() returns
+/// error (not MESSAGE_OK)
+
#include "startsession.h"
#include <library/cpp/messagebus/ybus.h>
-
+
#include <util/generic/noncopyable.h>
#include <util/generic/object_counter.h>
-namespace NBus {
+namespace NBus {
class TBusJob;
class TBusModule;
-
+
namespace NPrivate {
struct TCallJobHandlerWorkItem;
struct TBusModuleImpl;
@@ -57,7 +57,7 @@ namespace NBus {
struct TModuleClientHandler;
struct TJobRunner;
}
-
+
class TJobHandler {
protected:
typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
@@ -86,12 +86,12 @@ namespace NBus {
return (b->*MyPtr)(job, mess);
}
};
-
+
typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
-
+
////////////////////////////////////////////////////
/// \brief Pending message state
-
+
struct TJobState {
friend class TBusJob;
friend class ::TCrawlerModule;
@@ -131,9 +131,9 @@ namespace NBus {
public:
TString GetStatus(unsigned flags);
};
-
+
using TJobStateVec = TVector<TJobState>;
-
+
/////////////////////////////////////////////////////////
/// \brief Execution item = thread
@@ -147,10 +147,10 @@ namespace NBus {
public:
/// given a module and starter message
TBusJob(TBusModule* module, TBusMessage* message);
-
+
/// destructor will free all the message that were send and received
virtual ~TBusJob();
-
+
TBusMessage* GetMessage() const {
return Message;
}
@@ -161,13 +161,13 @@ namespace NBus {
/// If addr is set then use it as destination.
void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
-
+
void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
/// send reply to the starter message
virtual void SendReply(TBusMessageAutoPtr reply);
-
+
/// set the flag to terminate job at the earliest convenience
void Cancel(EMessageStatus status);
@@ -179,7 +179,7 @@ namespace NBus {
void PutState(const TJobState& state) {
Finished.push_back(state);
}
-
+
public:
/// retrieve all pending messages
void GetPending(TJobStateVec* stateVec) {
@@ -225,10 +225,10 @@ namespace NBus {
}
return static_cast<MessageType*>(call.Message);
}
- }
+ }
return nullptr;
- }
-
+ }
+
/// helper function to find status for previously sent message
template <class MessageType>
EMessageStatus GetStatus(int* startFrom = nullptr) {
@@ -240,10 +240,10 @@ namespace NBus {
}
return call.Status;
}
- }
+ }
return MESSAGE_UNKNOWN;
- }
-
+ }
+
/// helper function to clear state of previosly sent messages
template <class MessageType>
void Clear() {
@@ -256,24 +256,24 @@ namespace NBus {
} else {
++i;
}
- }
- }
-
+ }
+ }
+
/// helper function to clear state in order to try again
void ClearState(TJobState& state);
-
+
/// clears all message states
void ClearAllMessageStates();
/// returns true if job is done
bool IsDone();
-
+
/// return human reabable status of this job
virtual TString GetStatus(unsigned flags);
/// set sleep time for job
void Sleep(int milliSeconds);
-
+
void CallJobHandlerOnly();
private:
@@ -297,7 +297,7 @@ namespace NBus {
TOnMessageContext OnMessageContext; // starter
public:
bool ReplySent;
-
+
private:
friend class TBusModule;
friend struct NPrivate::TBusModuleImpl;
@@ -312,7 +312,7 @@ namespace NBus {
NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep
};
-
+
////////////////////////////////////////////////////////////////////
/// \brief Classes to implement basic module functionality
@@ -358,18 +358,18 @@ namespace NBus {
friend class TBusJob;
TObjectCounter<TBusModule> ObjectCounter;
-
+
TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
-
+
public:
/// Each module should have a name which is used as protocol service
TBusModule(const char* name);
~TBusModule() override;
const char* GetName() const;
-
+
void SetConfig(const TBusModuleConfig& config);
-
+
/// get status of all jobs in flight
TString GetStatus(unsigned flags = 0);
@@ -380,7 +380,7 @@ namespace NBus {
// this default implementation just creates TBusJob object
TBusJob* CreateJobInstance(TBusMessage* message) override;
-
+
EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
/// creates private sessions, calls CreateExtSession(), should be called before StartInput()
@@ -391,7 +391,7 @@ namespace NBus {
public:
/// entry point into module, first function to call
virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
-
+
protected:
/// override this function to create destination session
virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;
diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp
index 8126e11530..7c38801d62 100644
--- a/library/cpp/messagebus/oldmodule/startsession.cpp
+++ b/library/cpp/messagebus/oldmodule/startsession.cpp
@@ -1,19 +1,19 @@
-///////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////
/// \file
-/// \brief Starter session implementation
+/// \brief Starter session implementation
-/// Starter session will generate emtpy message to insert
-/// into local session that are registered under same protocol
+/// Starter session will generate emtpy message to insert
+/// into local session that are registered under same protocol
-/// Starter (will one day) automatically adjust number
-/// of message inflight to make sure that at least one of source
-/// sessions within message queue is at the limit (bottle neck)
+/// Starter (will one day) automatically adjust number
+/// of message inflight to make sure that at least one of source
+/// sessions within message queue is at the limit (bottle neck)
+
+/// Maximum number of messages that starter will instert into
+/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight
+
+#include "startsession.h"
-/// Maximum number of messages that starter will instert into
-/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight
-
-#include "startsession.h"
-
#include "module.h"
#include <library/cpp/messagebus/ybus.h>
@@ -24,7 +24,7 @@ namespace NBus {
pThis->Starter();
return nullptr;
}
-
+
TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)
: Module(module)
, Config(config)
@@ -33,11 +33,11 @@ namespace NBus {
{
StartThread.Start();
}
-
+
TBusStarter::~TBusStarter() {
Shutdown();
}
-
+
void TBusStarter::Shutdown() {
{
TGuard<TMutex> g(ExitLock);
@@ -46,20 +46,20 @@ namespace NBus {
}
StartThread.Join();
}
-
+
void TBusStarter::Starter() {
TGuard<TMutex> g(ExitLock);
while (!Exiting) {
TAutoPtr<TBusMessage> empty(new TBusMessage(0));
-
+
EMessageStatus status = Module->StartJob(empty);
-
+
if (Config.SendTimeout > 0) {
ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));
} else {
ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());
}
- }
- }
+ }
+ }
}
diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h
index c6b407743d..5e26e7e1e5 100644
--- a/library/cpp/messagebus/oldmodule/startsession.h
+++ b/library/cpp/messagebus/oldmodule/startsession.h
@@ -1,12 +1,12 @@
#pragma once
-
+
#include <library/cpp/messagebus/ybus.h>
-#include <util/system/thread.h>
-
-namespace NBus {
+#include <util/system/thread.h>
+
+namespace NBus {
class TBusModule;
-
+
class TBusStarter {
private:
TBusModule* Module;
@@ -23,12 +23,12 @@ namespace NBus {
TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) {
return "";
}
-
+
public:
TBusStarter(TBusModule* module, const TBusSessionConfig& config);
~TBusStarter();
void Shutdown();
};
-
+
}
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h
index 7e3a125868..57b4267ea5 100644
--- a/library/cpp/messagebus/protobuf/ybusbuf.h
+++ b/library/cpp/messagebus/protobuf/ybusbuf.h
@@ -1,24 +1,24 @@
#pragma once
-
+
#include <library/cpp/messagebus/ybus.h>
-
+
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
-
+
#include <util/generic/cast.h>
#include <util/generic/vector.h>
#include <util/stream/mem.h>
-
+
#include <array>
-namespace NBus {
+namespace NBus {
using TBusBufferRecord = ::google::protobuf::Message;
-
+
template <class TBufferMessage>
class TBusBufferMessagePtr;
template <class TBufferMessage>
class TBusBufferMessageAutoPtr;
-
+
class TBusBufferBase: public TBusMessage {
public:
TBusBufferBase(int type)
@@ -43,7 +43,7 @@ namespace NBus {
/// @param TBufferRecord is record described in .proto file with namespace
/// @param MessageFile is offset for .proto file message ids
-
+
/// \attention If you want one protocol NBus::TBusBufferProtocol to handle
/// messageges described in different .proto files, make sure that they have
/// unique values for MessageFile
@@ -52,7 +52,7 @@ namespace NBus {
class TBusBufferMessage: public TBusBufferBase {
public:
static const int MessageType = MType;
-
+
typedef TBusBufferMessagePtr<TBusBufferMessage<TBufferRecord, MType>> TPtr;
typedef TBusBufferMessageAutoPtr<TBusBufferMessage<TBufferRecord, MType>> TAutoPtr;
@@ -88,7 +88,7 @@ namespace NBus {
return new TBusBufferMessage<TBufferRecord, MessageType>();
}
};
-
+
template <class TSelf, class TBufferMessage>
class TBusBufferMessagePtrBase {
public:
@@ -101,7 +101,7 @@ namespace NBus {
const TSelf* GetSelf() const {
return static_cast<const TSelf*>(this);
}
-
+
public:
RecordType* operator->() {
Y_ASSERT(GetSelf()->Get());
@@ -119,7 +119,7 @@ namespace NBus {
Y_ASSERT(GetSelf()->Get());
return GetSelf()->Get()->Record;
}
-
+
TBusHeader* GetHeader() {
return GetSelf()->Get()->GetHeader();
}
@@ -127,7 +127,7 @@ namespace NBus {
return GetSelf()->Get()->GetHeader();
}
};
-
+
template <class TBufferMessage>
class TBusBufferMessagePtr: public TBusBufferMessagePtrBase<TBusBufferMessagePtr<TBufferMessage>, TBufferMessage> {
protected:
@@ -179,7 +179,7 @@ namespace NBus {
: AutoPtr(message)
{
}
-
+
TBufferMessage* Get() {
return AutoPtr.Get();
}
@@ -198,7 +198,7 @@ namespace NBus {
return AutoPtr.Release();
}
};
-
+
/////////////////////////////////////////////
/// \brief Generic protocol object for messages descibed with protobuf
@@ -223,7 +223,7 @@ namespace NBus {
void RegisterType(TAutoPtr<TBusBufferBase> mess);
TArrayRef<TBusBufferBase* const> GetTypes() const;
-
+
/// serialized protocol specific data into TBusData
void Serialize(const TBusMessage* mess, TBuffer& data) override;
diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp
index 142fba2c71..46a7ece6a8 100644
--- a/library/cpp/messagebus/session.cpp
+++ b/library/cpp/messagebus/session.cpp
@@ -7,7 +7,7 @@ using namespace NBus;
namespace NBus {
TBusSession::TBusSession() {
}
-
+
////////////////////////////////////////////////////////////////////
/// \brief Adds peer of connection into connection list
@@ -20,7 +20,7 @@ namespace NBus {
case AF_INET: {
return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr));
}
-
+
case AF_INET6: {
return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr));
}
@@ -32,13 +32,13 @@ namespace NBus {
bool operator<(const TNetAddr& a1, const TNetAddr& a2) {
return CompareByHost(a1, a2) < 0;
}
-
+
size_t TBusSession::GetInFlight(const TNetAddr& addr) const {
size_t r;
GetInFlightBulk({addr}, MakeArrayRef(&r, 1));
return r;
}
-
+
size_t TBusSession::GetConnectSyscallsNumForTest(const TNetAddr& addr) const {
size_t r;
GetConnectSyscallsNumBulkForTest({addr}, MakeArrayRef(&r, 1));
@@ -107,16 +107,16 @@ namespace NBus {
return -1;
}
}
-
+
TBusService service = GetProto()->GetService();
return GetQueue()->GetLocator()->Register(service, hostName.data(), portNum, start, end, ipVersion);
- }
-
+ }
+
TBusSession::~TBusSession() {
}
-}
-
+}
+
TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue) {
return queue->CreateSource(proto, handler, config);
}
diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h
index 5aa9b53df0..26b7475308 100644
--- a/library/cpp/messagebus/test/helper/example.h
+++ b/library/cpp/messagebus/test/helper/example.h
@@ -1,10 +1,10 @@
#pragma once
-
+
#include <library/cpp/testing/unittest/registar.h>
#include "alloc_counter.h"
#include "message_handler_error.h"
-
+
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/misc/test_sync.h>
@@ -14,13 +14,13 @@ namespace NBus {
namespace NTest {
class TExampleRequest: public TBusMessage {
friend class TExampleProtocol;
-
+
private:
TAllocCounter AllocCounter;
public:
TString Data;
-
+
public:
TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
@@ -28,10 +28,10 @@ namespace NBus {
class TExampleResponse: public TBusMessage {
friend class TExampleProtocol;
-
+
private:
TAllocCounter AllocCounter;
-
+
public:
TString Data;
TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
@@ -47,11 +47,11 @@ namespace NBus {
TAtomic StartCount;
TExampleProtocol(int port = 0);
-
+
~TExampleProtocol() override;
-
+
void Serialize(const TBusMessage* message, TBuffer& buffer) override;
-
+
TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
};
@@ -77,7 +77,7 @@ namespace NBus {
~TExampleClient() override;
EMessageStatus SendMessage(const TNetAddr* addr = nullptr);
-
+
void SendMessages(size_t count, const TNetAddr* addr = nullptr);
void SendMessages(size_t count, const TNetAddr& addr);
@@ -90,7 +90,7 @@ namespace NBus {
void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);
void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;
-
+
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
};
@@ -101,12 +101,12 @@ namespace NBus {
bool AckMessageBeforeSendReply;
TMaybe<size_t> DataSize; // Nothing means use request size
bool ForgetRequest;
-
+
TTestSync TestSync;
TBusMessageQueuePtr Bus;
TBusServerSessionPtr Session;
-
+
public:
TExampleServer(
const char* name = "TExampleServer",
@@ -115,7 +115,7 @@ namespace NBus {
TExampleServer(unsigned port, const char* name = "TExampleServer");
~TExampleServer() override;
-
+
public:
size_t GetInFlight() const;
unsigned GetActualListenPort() const;
@@ -127,6 +127,6 @@ namespace NBus {
protected:
void OnMessage(TOnMessageContext& mess) override;
};
-
+
}
}
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index 6ca846099f..8489319278 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -1,5 +1,5 @@
#include "simple_proto.h"
-
+
#include <library/cpp/messagebus/test/perftest/messages.pb.h>
#include <library/cpp/messagebus/text_utils.h>
@@ -448,66 +448,66 @@ private:
}
};
-// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000
-
-using namespace std;
-using namespace NBus;
-
+// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000
+
+using namespace std;
+using namespace NBus;
+
static TNetworkAddress ParseNetworkAddress(const char* string) {
TString Name;
int Port;
-
+
const char* port = strchr(string, ':');
-
+
if (port != nullptr) {
Name.append(string, port - string);
- Port = atoi(port + 1);
+ Port = atoi(port + 1);
} else {
Name.append(string);
Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
- }
-
+ }
+
return TNetworkAddress(Name, Port);
}
TVector<TNetAddr> ParseNodes(const TString nodes) {
TVector<TNetAddr> r;
-
+
TVector<TString> hosts;
-
+
size_t numh = Split(nodes.data(), ",", hosts);
-
- for (int i = 0; i < int(numh); i++) {
+
+ for (int i = 0; i < int(numh); i++) {
const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data());
Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses");
r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin()));
- }
-
+ }
+
return r;
-}
-
+}
+
TPerftestConfig::TPerftestConfig() {
TBusSessionConfig defaultConfig;
ServerPort = DEFAULT_PORT;
- Delay = 0; // artificial delay inside server OnMessage()
+ Delay = 0; // artificial delay inside server OnMessage()
MessageSize = 200;
- Failure = 0.00;
- Run = 60; // in seconds
- Nodes = "localhost";
+ Failure = 0.00;
+ Run = 60; // in seconds
+ Nodes = "localhost";
ServerUseModules = false;
ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool;
ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool;
UseCompression = false;
Profile = false;
WwwPort = 0;
-}
-
+}
+
TPerftestConfig* TheConfig = new TPerftestConfig();
bool TheExit = false;
TSystemEvent StopEvent;
-
+
TSimpleSharedPtr<TPerftestServer> Server;
TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
@@ -516,13 +516,13 @@ TMutex ClientsLock;
void stopsignal(int /*sig*/) {
fprintf(stderr, "\n-------------------- exiting ------------------\n");
- TheExit = true;
+ TheExit = true;
StopEvent.Signal();
-}
-
-// -s <num> - start server on port <num>
-// -c <node:port,node:port> - start client
-
+}
+
+// -s <num> - start server on port <num>
+// -c <node:port,node:port> - start client
+
void TTestStats::PeriodicallyPrint() {
SetCurrentThreadName("print-stats");
@@ -589,7 +589,7 @@ void TTestStats::PeriodicallyPrint() {
int main(int argc, char* argv[]) {
NLWTrace::StartLwtraceFromEnv();
-
+
/* unix foo */
setvbuf(stdout, nullptr, _IONBF, 0);
setvbuf(stderr, nullptr, _IONBF, 0);
@@ -600,7 +600,7 @@ int main(int argc, char* argv[]) {
SetAsyncSignalHandler(SIGUSR1, stopsignal);
#endif
signal(SIGPIPE, SIG_IGN);
-
+
NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
@@ -621,7 +621,7 @@ int main(int argc, char* argv[]) {
opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort);
opts.AddHelpOption();
-
+
Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-");
Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-");
Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-");
@@ -631,9 +631,9 @@ int main(int argc, char* argv[]) {
NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
- TheConfig->Print();
+ TheConfig->Print();
Config.Print();
-
+
if (TheConfig->Profile) {
BeginProfiling();
}
@@ -642,7 +642,7 @@ int main(int argc, char* argv[]) {
ServerAddresses = ParseNodes(TheConfig->Nodes);
- if (TheConfig->ServerPort) {
+ if (TheConfig->ServerPort) {
if (TheConfig->ServerUseModules) {
ServerUsingModule = new TPerftestUsingModule();
www->RegisterModule(ServerUsingModule.Get());
@@ -650,7 +650,7 @@ int main(int argc, char* argv[]) {
Server = new TPerftestServer();
www->RegisterServerSession(Server->Session);
}
- }
+ }
TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
@@ -661,8 +661,8 @@ int main(int argc, char* argv[]) {
futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back())));
www->RegisterClientSession(Clients.back()->Session);
}
- }
-
+ }
+
futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats))));
THolder<TBusWwwHttpServer> wwwServer;
@@ -709,5 +709,5 @@ int main(int argc, char* argv[]) {
}
Cerr << "***SUCCESS***\n";
- return 0;
-}
+ return 0;
+}
diff --git a/library/cpp/messagebus/test/perftest/ya.make b/library/cpp/messagebus/test/perftest/ya.make
index 0d4288cee9..24c2848ed5 100644
--- a/library/cpp/messagebus/test/perftest/ya.make
+++ b/library/cpp/messagebus/test/perftest/ya.make
@@ -1,8 +1,8 @@
PROGRAM(messagebus_perftest)
-
+
OWNER(g:messagebus)
-PEERDIR(
+PEERDIR(
library/cpp/deprecated/threadable
library/cpp/execprofile
library/cpp/getopt
@@ -13,12 +13,12 @@ PEERDIR(
library/cpp/messagebus/www
library/cpp/sighandler
library/cpp/threading/future
-)
-
-SRCS(
+)
+
+SRCS(
messages.proto
- perftest.cpp
+ perftest.cpp
simple_proto.cpp
-)
-
-END()
+)
+
+END()
diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h
index a128d3ab21..d5da72c0cb 100644
--- a/library/cpp/messagebus/test/ut/moduletest.h
+++ b/library/cpp/messagebus/test/ut/moduletest.h
@@ -1,9 +1,9 @@
#pragma once
-
-///////////////////////////////////////////////////////////////////
-/// \file
-/// \brief Example of using local session for communication.
-
+
+///////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Example of using local session for communication.
+
#include <library/cpp/messagebus/test/helper/alloc_counter.h>
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/message_handler_error.h>
@@ -14,10 +14,10 @@
namespace NBus {
namespace NTest {
using namespace std;
-
+
#define TYPE_HOSTINFOREQUEST 100
-#define TYPE_HOSTINFORESPONSE 101
-
+#define TYPE_HOSTINFORESPONSE 101
+
////////////////////////////////////////////////////////////////////
/// \brief DupDetect protocol that common between client and server
////////////////////////////////////////////////////////////////////
@@ -32,11 +32,11 @@ namespace NBus {
: TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
{
}
-
+
~THostInfoMessage() override {
}
};
-
+
////////////////////////////////////////////////////////////////////
/// \brief HostInfo reply class
class THostInfoReply: public TBusMessage {
@@ -49,11 +49,11 @@ namespace NBus {
: TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
{
}
-
+
~THostInfoReply() override {
}
};
-
+
////////////////////////////////////////////////////////////////////
/// \brief HostInfo protocol that common between client and server
class THostInfoProtocol: public TBusProtocol {
@@ -67,7 +67,7 @@ namespace NBus {
Y_UNUSED(data);
Y_UNUSED(mess);
}
-
+
/// deserialized TBusData into new instance of the message
TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
Y_UNUSED(payload);
@@ -81,23 +81,23 @@ namespace NBus {
}
}
};
-
+
//////////////////////////////////////////////////////////////
/// \brief HostInfo handler (should convert it to module too)
struct THostInfoHandler: public TBusServerHandlerError {
TBusServerSessionPtr Session;
TBusServerSessionConfig HostInfoConfig;
THostInfoProtocol HostInfoProto;
-
+
THostInfoHandler(TBusMessageQueue* queue) {
Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue);
}
-
+
void OnMessage(TOnMessageContext& mess) override {
usleep(10 * 1000); /// pretend we are doing something
-
+
TAutoPtr<THostInfoReply> reply(new THostInfoReply());
-
+
mess.SendReplyMove(reply);
}
@@ -105,7 +105,7 @@ namespace NBus {
return TNetAddr("localhost", Session->GetActualListenPort());
}
};
-
+
//////////////////////////////////////////////////////////////
/// \brief DupDetect handler (should convert it to module too)
struct TDupDetectHandler: public TBusClientHandlerError {
@@ -114,34 +114,34 @@ namespace NBus {
TBusClientSessionPtr DupDetect;
TBusClientSessionConfig DupDetectConfig;
TExampleProtocol DupDetectProto;
-
+
int NumMessages;
int NumReplies;
-
+
TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue)
: ServerAddr(serverAddr)
{
DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue);
DupDetect->RegisterService("localhost");
}
-
+
void Work() {
NumMessages = 10;
NumReplies = 0;
-
+
for (int i = 0; i < NumMessages; i++) {
TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount);
DupDetect->SendMessage(mess, &ServerAddr);
}
}
-
+
void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
Y_UNUSED(mess);
Y_UNUSED(reply);
NumReplies++;
}
};
-
+
/////////////////////////////////////////////////////////////////
/// \brief DupDetect module
@@ -151,12 +151,12 @@ namespace NBus {
TBusClientSessionPtr HostInfoClientSession;
TBusClientSessionConfig HostInfoConfig;
THostInfoProtocol HostInfoProto;
-
+
TExampleProtocol DupDetectProto;
TBusServerSessionConfig DupDetectConfig;
-
+
TNetAddr ListenAddr;
-
+
TDupDetectModule(const TNetAddr& hostInfoAddr)
: TBusModule("DUPDETECTMODULE")
, HostInfoAddr(hostInfoAddr)
@@ -166,10 +166,10 @@ namespace NBus {
bool Init(TBusMessageQueue* queue) {
HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
HostInfoClientSession->RegisterService("localhost");
-
+
return TBusModule::CreatePrivateSessions(queue);
}
-
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
@@ -177,14 +177,14 @@ namespace NBus {
return session;
}
-
+
/// entry point into module, first function to call
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
Y_UNUSED(dmess);
-
+
THostInfoMessage* hmess = new THostInfoMessage();
-
+
/// send message to imaginary hostinfo server
job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr);
@@ -195,27 +195,27 @@ namespace NBus {
TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) {
TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
Y_UNUSED(dmess);
-
+
THostInfoMessage* hmess = job->Get<THostInfoMessage>();
THostInfoReply* hreply = job->Get<THostInfoReply>();
EMessageStatus hstatus = job->GetStatus<THostInfoMessage>();
Y_ASSERT(hmess != nullptr);
Y_ASSERT(hreply != nullptr);
Y_ASSERT(hstatus == MESSAGE_OK);
-
+
return TJobHandler(&TDupDetectModule::Finish);
}
/// last handler sends reply and returns NULL
TJobHandler Finish(TBusJob* job, TBusMessage* mess) {
Y_UNUSED(mess);
-
+
TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount);
job->SendReply(reply);
-
+
return nullptr;
}
};
-
- }
+
+ }
}
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
index 61d3a465a7..9c21227e2b 100644
--- a/library/cpp/messagebus/test/ut/one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -1,134 +1,134 @@
-///////////////////////////////////////////////////////////////////
-/// \file
+///////////////////////////////////////////////////////////////////
+/// \file
/// \brief Example of reply-less communication
-/// This example demostrates how asynchronous message passing library
-/// can be used to send message and do not wait for reply back.
-/// The usage of reply-less communication should be restricted to
-/// low-throughput clients and high-throughput server to provide reasonable
-/// utility. Removing replies from the communication removes any restriction
-/// on how many message can be send to server and rougue clients may overwelm
-/// server without thoughtput control.
+/// This example demostrates how asynchronous message passing library
+/// can be used to send message and do not wait for reply back.
+/// The usage of reply-less communication should be restricted to
+/// low-throughput clients and high-throughput server to provide reasonable
+/// utility. Removing replies from the communication removes any restriction
+/// on how many message can be send to server and rougue clients may overwelm
+/// server without thoughtput control.
-/// 1) To implement reply-less client \n
+/// 1) To implement reply-less client \n
/// Call NBus::TBusSession::AckMessage()
/// from within NBus::IMessageHandler::OnSent() handler when message has
/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
-/// Discard identity for reply message.
+/// Discard identity for reply message.
-/// 2) To implement reply-less server \n
+/// 2) To implement reply-less server \n
-/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
+/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
/// handler when message has been received on server end.
/// See example in NBus::NullServer::OnMessage().
-/// Discard identity for reply message.
-
+/// Discard identity for reply message.
+
#include <library/cpp/messagebus/test/helper/alloc_counter.h>
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/hanging_server.h>
#include <library/cpp/messagebus/test/helper/message_handler_error.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>
-
+
#include <library/cpp/messagebus/ybus.h>
-using namespace std;
+using namespace std;
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NBus::NTest;
-
-////////////////////////////////////////////////////////////////////
-////////////////////////////////////////////////////////////////////
-/// \brief Reply-less client and handler
+
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+/// \brief Reply-less client and handler
struct NullClient : TBusClientHandlerError {
TNetAddr ServerAddr;
TBusMessageQueuePtr Queue;
TBusClientSessionPtr Session;
TExampleProtocol Proto;
-
- /// constructor creates instances of protocol and session
+
+ /// constructor creates instances of protocol and session
NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
: ServerAddr(serverAddr)
{
UNIT_ASSERT(serverAddr.GetPort() > 0);
-
- /// create or get instance of message queue, need one per application
+
+ /// create or get instance of message queue, need one per application
Queue = CreateMessageQueue();
-
- /// register source/client session
+
+ /// register source/client session
Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
/// register service, announce to clients via LocatorService
Session->RegisterService("localhost");
- }
-
+ }
+
~NullClient() override {
Session->Shutdown();
}
- /// dispatch of requests is done here
+ /// dispatch of requests is done here
void Work() {
- int batch = 10;
-
+ int batch = 10;
+
for (int i = 0; i < batch; i++) {
TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
mess->Data = "TADA";
Session->SendMessageOneWay(mess, &ServerAddr);
- }
- }
-
+ }
+ }
+
void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
- }
-};
-
-/////////////////////////////////////////////////////////////////////
-/// \brief Reply-less server and handler
+ }
+};
+
+/////////////////////////////////////////////////////////////////////
+/// \brief Reply-less server and handler
class NullServer: public TBusServerHandlerError {
-public:
- /// session object to maintian
+public:
+ /// session object to maintian
TBusMessageQueuePtr Queue;
TBusServerSessionPtr Session;
TExampleProtocol Proto;
-
-public:
+
+public:
TAtomic NumMessages;
-
- NullServer() {
- NumMessages = 0;
-
- /// create or get instance of single message queue, need one for application
+
+ NullServer() {
+ NumMessages = 0;
+
+ /// create or get instance of single message queue, need one for application
Queue = CreateMessageQueue();
-
- /// register destination session
+
+ /// register destination session
TBusServerSessionConfig sessionConfig;
Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
- }
-
+ }
+
~NullServer() override {
Session->Shutdown();
}
- /// when message comes do not send reply, just acknowledge
+ /// when message comes do not send reply, just acknowledge
void OnMessage(TOnMessageContext& mess) override {
TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
-
+
Y_ASSERT(fmess->Data == "TADA");
-
- /// tell session to forget this message and never expect any reply
+
+ /// tell session to forget this message and never expect any reply
mess.ForgetRequest();
-
+
AtomicIncrement(NumMessages);
- }
-
- /// this handler should not be called because this server does not send replies
+ }
+
+ /// this handler should not be called because this server does not send replies
void OnSent(TAutoPtr<TBusMessage> mess) override {
Y_UNUSED(mess);
Y_FAIL("This server does not sent replies");
}
-};
-
+};
+
Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
Y_UNIT_TEST(Simple) {
TObjectCountCheck objectCountCheck;
diff --git a/library/cpp/messagebus/ya.make b/library/cpp/messagebus/ya.make
index 5d26086a73..e13cf06dea 100644
--- a/library/cpp/messagebus/ya.make
+++ b/library/cpp/messagebus/ya.make
@@ -49,7 +49,7 @@ SRCS(
use_after_free_checker.cpp
use_count_checker.cpp
ybus.h
-)
+)
PEERDIR(
contrib/libs/sparsehash
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h
index 77839f0ef1..de21ad8521 100644
--- a/library/cpp/messagebus/ybus.h
+++ b/library/cpp/messagebus/ybus.h
@@ -27,23 +27,23 @@
#include <util/generic/buffer.h>
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
-#include <util/stream/input.h>
-#include <util/system/atomic.h>
+#include <util/stream/input.h>
+#include <util/system/atomic.h>
#include <util/system/condvar.h>
#include <util/system/type_name.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
-
-namespace NBus {
+
+namespace NBus {
////////////////////////////////////////////////////////
/// \brief Common structure to store address information
-
+
int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept;
bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses
/////////////////////////////////////////////////////////////////////////
/// \brief Handles routing and data encoding to/from wire
-
+
/// Protocol is stateless threadsafe singleton object that
/// encapsulates relationship between a message (TBusMessage) object
/// and destination server. Protocol object is reponsible for serializing in-memory
@@ -69,7 +69,7 @@ namespace NBus {
TBusService GetService() const {
return ServiceName.data();
}
-
+
/// returns port number for destination session to open socket
int GetPort() const {
return ServicePort;
@@ -81,18 +81,18 @@ namespace NBus {
/// \brief serialized protocol specific data into TBusData
/// \note buffer passed to the function (data) is not empty, use append functions
virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0;
-
+
/// deserialized TBusData into new instance of the message
virtual TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) = 0;
-
+
/// returns key for messages of this protocol
virtual TBusKey GetKey(const TBusMessage*) {
return YBUS_KEYMIN;
}
-
+
/// default implementation of routing policy to allow overrides
virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr);
-
+
/// codec for transport level compression
virtual NCodecs::TCodecPtr GetTransportCodec(void) const {
return NCodecs::ICodec::GetInstance("snappy");
@@ -101,7 +101,7 @@ namespace NBus {
class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> {
friend class TBusMessageQueue;
-
+
public:
TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session);
~TBusSyncSourceSession();
@@ -139,9 +139,9 @@ namespace NBus {
TList<TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl>> Sessions;
TSimpleIntrusivePtr<TBusLocator> Locator;
NPrivate::TScheduler Scheduler;
-
+
::NActor::TExecutorPtr WorkQueue;
-
+
TAtomic Running;
TSystemEvent ShutdownComplete;
@@ -151,12 +151,12 @@ namespace NBus {
public:
TString GetNameInternal() const;
-
+
~TBusMessageQueue();
void Stop();
bool IsRunning();
-
+
public:
void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) {
WorkQueue->EnqueueWork(w);
@@ -180,11 +180,11 @@ namespace NBus {
TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = "");
TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = "");
TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name = "");
-
+
private:
void Destroy(TBusSession* session);
void Destroy(TBusSyncClientSessionPtr session);
-
+
public:
void Schedule(NPrivate::IScheduleItemAutoPtr i);
@@ -201,5 +201,5 @@ namespace NBus {
TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = "");
TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = "");
TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = "");
-
+
}
diff --git a/util/network/socket.cpp b/util/network/socket.cpp
index 51bb2beeef..4f6e804346 100644
--- a/util/network/socket.cpp
+++ b/util/network/socket.cpp
@@ -81,8 +81,8 @@ struct evpair {
};
static const evpair evpairs_to_win[] = {
- {POLLIN, FD_READ | FD_CLOSE | FD_ACCEPT},
- {POLLRDNORM, FD_READ | FD_CLOSE | FD_ACCEPT},
+ {POLLIN, FD_READ | FD_CLOSE | FD_ACCEPT},
+ {POLLRDNORM, FD_READ | FD_CLOSE | FD_ACCEPT},
{POLLRDBAND, -1},
{POLLPRI, -1},
{POLLOUT, FD_WRITE | FD_CLOSE},
@@ -95,7 +95,7 @@ static const evpair evpairs_to_win[] = {
static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]);
static const evpair evpairs_to_unix[] = {
- {FD_ACCEPT, POLLIN | POLLRDNORM},
+ {FD_ACCEPT, POLLIN | POLLRDNORM},
{FD_READ, POLLIN | POLLRDNORM},
{FD_WRITE, POLLOUT | POLLWRNORM},
{FD_CLOSE, POLLHUP},
@@ -150,8 +150,8 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
TWSAEventHolder event(rawEvent);
- int checked_sockets = 0;
-
+ int checked_sockets = 0;
+
for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
int win_events = convert_events(fd->events, evpairs_to_win, nevpairs_to_win, false);
if (win_events == -1) {
@@ -161,7 +161,7 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
fd->revents = 0;
if (WSAEventSelect(fd->fd, event.Get(), win_events)) {
int error = WSAGetLastError();
- if (error == WSAEINVAL || error == WSAENOTSOCK) {
+ if (error == WSAEINVAL || error == WSAENOTSOCK) {
fd->revents = POLLNVAL;
++checked_sockets;
} else {
@@ -169,34 +169,34 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
return -1;
}
}
- fd_set readfds;
- fd_set writefds;
+ fd_set readfds;
+ fd_set writefds;
struct timeval timeout = {0, 0};
- FD_ZERO(&readfds);
- FD_ZERO(&writefds);
- if (fd->events & POLLIN) {
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ if (fd->events & POLLIN) {
FD_SET(fd->fd, &readfds);
- }
- if (fd->events & POLLOUT) {
+ }
+ if (fd->events & POLLOUT) {
FD_SET(fd->fd, &writefds);
- }
+ }
int error = select(0, &readfds, &writefds, nullptr, &timeout);
- if (error > 0) {
+ if (error > 0) {
if (FD_ISSET(fd->fd, &readfds)) {
- fd->revents |= POLLIN;
- }
+ fd->revents |= POLLIN;
+ }
if (FD_ISSET(fd->fd, &writefds)) {
- fd->revents |= POLLOUT;
- }
+ fd->revents |= POLLOUT;
+ }
++checked_sockets;
- }
+ }
+ }
+
+ if (checked_sockets > 0) {
+ // returns without wait since we already have sockets in desired conditions
+ return checked_sockets;
}
- if (checked_sockets > 0) {
- // returns without wait since we already have sockets in desired conditions
- return checked_sockets;
- }
-
HANDLE events[] = {event.Get()};
DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE);
if (wait_result == WSA_WAIT_TIMEOUT)