aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorleo <leo@yandex-team.ru>2022-02-10 16:46:40 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:40 +0300
commit99609724f661f7e21d1cb08e8d80e87c3632fdb3 (patch)
tree49e222ea1c5804306084bb3ae065bb702625360f /library/cpp/messagebus
parent980edcd3304699edf9d4e4d6a656e585028e2a72 (diff)
downloadydb-99609724f661f7e21d1cb08e8d80e87c3632fdb3.tar.gz
Restoring authorship annotation for <leo@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/coreconn.cpp2
-rw-r--r--library/cpp/messagebus/coreconn.h8
-rw-r--r--library/cpp/messagebus/handler.cpp2
-rw-r--r--library/cpp/messagebus/locator.cpp6
-rw-r--r--library/cpp/messagebus/message.cpp4
-rw-r--r--library/cpp/messagebus/messqueue.cpp24
-rw-r--r--library/cpp/messagebus/network.cpp8
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp46
-rw-r--r--library/cpp/messagebus/oldmodule/module.h10
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.cpp6
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.h4
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.cpp2
-rw-r--r--library/cpp/messagebus/session.cpp2
-rw-r--r--library/cpp/messagebus/synchandler.cpp2
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp8
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp4
-rw-r--r--library/cpp/messagebus/ya.make2
-rw-r--r--library/cpp/messagebus/ybus.h6
18 files changed, 73 insertions, 73 deletions
diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp
index d25f48930e..d9411bb5db 100644
--- a/library/cpp/messagebus/coreconn.cpp
+++ b/library/cpp/messagebus/coreconn.cpp
@@ -8,7 +8,7 @@
#include <util/string/util.h>
#include <util/system/thread.h>
-namespace NBus {
+namespace NBus {
TBusInstant Now() {
return millisec();
}
diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h
index 7547c9aba6..fca228d82e 100644
--- a/library/cpp/messagebus/coreconn.h
+++ b/library/cpp/messagebus/coreconn.h
@@ -27,10 +27,10 @@
#include <deque>
#include <utility>
-#ifdef NO_ERROR
+#ifdef NO_ERROR
#undef NO_ERROR
-#endif
-
+#endif
+
#define BUS_WORKER_CONDVAR
//#define BUS_WORKER_MIXED
@@ -63,5 +63,5 @@ namespace NBus {
POLL_READ,
POLL_WRITE
};
-
+
}
diff --git a/library/cpp/messagebus/handler.cpp b/library/cpp/messagebus/handler.cpp
index 3ad336c91d..333bd52934 100644
--- a/library/cpp/messagebus/handler.cpp
+++ b/library/cpp/messagebus/handler.cpp
@@ -1,7 +1,7 @@
#include "handler.h"
#include "remote_server_connection.h"
-#include "ybus.h"
+#include "ybus.h"
using namespace NBus;
using namespace NBus::NPrivate;
diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp
index 11716e3cd5..e38a35c426 100644
--- a/library/cpp/messagebus/locator.cpp
+++ b/library/cpp/messagebus/locator.cpp
@@ -9,7 +9,7 @@
#include <util/generic/hash_set.h>
#include <util/system/hostname.h>
-namespace NBus {
+namespace NBus {
using namespace NAddr;
static TIpPort GetAddrPort(const IRemoteAddr& addr) {
@@ -360,7 +360,7 @@ namespace NBus {
}
int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
- TGuard<TMutex> G(Lock);
+ TGuard<TMutex> G(Lock);
Y_VERIFY(keys.empty(), "Non empty keys");
TServiceId serviceId = GetServiceId(service);
@@ -374,7 +374,7 @@ namespace NBus {
continue;
}
keys.push_back(std::make_pair(item.Start, item.End));
- }
+ }
return (int)keys.size();
}
diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp
index 0fba16bc68..bfa7ed8e9b 100644
--- a/library/cpp/messagebus/message.cpp
+++ b/library/cpp/messagebus/message.cpp
@@ -9,7 +9,7 @@
using namespace NBus;
-namespace NBus {
+namespace NBus {
using namespace NBus::NPrivate;
TBusIdentity::TBusIdentity()
@@ -156,7 +156,7 @@ namespace NBus {
ReplyTo = data.Connection->PeerAddrSocketAddr;
SetCompressed(compressed || IsCompressedResponse());
}
-
+
void TBusMessage::SetCompressed(bool v) {
if (v) {
GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL;
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 5de2c1a12c..3474d62705 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -24,17 +24,17 @@ TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBus
TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) {
return CreateMessageQueue(config, new TBusLocator, name);
-}
-
+}
+
TBusMessageQueuePtr NBus::CreateMessageQueue(TExecutorPtr executor, const char* name) {
return CreateMessageQueue(TBusQueueConfig(), executor, new TBusLocator, name);
}
TBusMessageQueuePtr NBus::CreateMessageQueue(const char* name) {
- TBusQueueConfig config;
+ TBusQueueConfig config;
return CreateMessageQueue(config, name);
-}
-
+}
+
namespace {
TBusQueueConfig QueueConfigFillDefaults(const TBusQueueConfig& orig, const TString& name) {
TBusQueueConfig patched = orig;
@@ -148,8 +148,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
} catch (...) {
Y_FAIL("create destination failure: %s", CurrentExceptionMessage().c_str());
}
-}
-
+}
+
TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name) {
TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name));
try {
@@ -179,15 +179,15 @@ void TBusMessageQueue::Destroy(TBusSession* session) {
void TBusMessageQueue::DestroyAllSessions() {
TList<TIntrusivePtr<TBusSessionImpl>> sessions;
- {
- TGuard<TMutex> scope(Lock);
+ {
+ TGuard<TMutex> scope(Lock);
sessions = Sessions;
- }
-
+ }
+
for (auto& session : sessions) {
Y_VERIFY(session->IsDown(), "Session must be shut down prior to queue shutdown");
}
-}
+}
void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) {
Scheduler.Schedule(i);
diff --git a/library/cpp/messagebus/network.cpp b/library/cpp/messagebus/network.cpp
index 214fbb5f1f..304bedae5a 100644
--- a/library/cpp/messagebus/network.cpp
+++ b/library/cpp/messagebus/network.cpp
@@ -1,8 +1,8 @@
-#include "network.h"
-
+#include "network.h"
+
#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
-#include <util/network/init.h>
+#include <util/network/init.h>
#include <util/network/socket.h>
#include <util/system/platform.h>
@@ -19,7 +19,7 @@ namespace {
}
int one = 1;
- int r1 = SetSockOpt(fd, SOL_SOCKET, SO_REUSEADDR, one);
+ int r1 = SetSockOpt(fd, SOL_SOCKET, SO_REUSEADDR, one);
if (r1 < 0) {
ythrow TSystemError() << "failed to setsockopt SO_REUSEADDR";
}
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 3dc9c80a94..24bd778799 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -60,7 +60,7 @@ namespace {
}
-namespace NBus {
+namespace NBus {
namespace NPrivate {
class TJobStorage {
};
@@ -327,11 +327,11 @@ namespace NBus {
{
Handler = TJobHandler(&TBusModule::Start);
}
-
+
TBusJob::~TBusJob() {
Y_ASSERT(Pending.size() == 0);
//Y_ASSERT(SleepUntil == 0);
-
+
ClearAllMessageStates();
}
@@ -618,7 +618,7 @@ namespace NBus {
}
//////////////////////////////////////////////////////////////////////
-
+
void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) {
TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob");
if (job) {
@@ -636,8 +636,8 @@ namespace NBus {
strReturn += "TODO\n";
}
return strReturn;
- }
-
+ }
+
TBusModuleConfig::TBusModuleConfig()
: StarterMaxInFlight(1000)
{
@@ -652,10 +652,10 @@ namespace NBus {
: Impl(new TBusModuleImpl(this, name))
{
}
-
+
TBusModule::~TBusModule() {
}
-
+
const char* TBusModule::GetName() const {
return Impl->Name;
}
@@ -680,7 +680,7 @@ namespace NBus {
bool TBusModule::Shutdown() {
Impl->Shutdown();
-
+
return true;
}
@@ -688,16 +688,16 @@ namespace NBus {
TBusJob* job = new TBusJob(this, message);
return job;
}
-
+
/**
-Example for external session creation:
-
-TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
- TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig);
- session->RegisterService(hostname, begin, end);
- return session;
-*/
-
+Example for external session creation:
+
+TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
+ TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig);
+ session->RegisterService(hostname, begin, end);
+ return session;
+*/
+
bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) {
Impl->Queue = queue;
return true;
@@ -782,11 +782,11 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) {
ShutdownCondVar.BroadCast();
}
}
- }
+ }
job->JobStorageIterator = TList<TJobRunner*>::iterator();
}
-
+
void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
Y_VERIFY(!!msg);
@@ -797,7 +797,7 @@ void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageCon
SetJob(jobRunner->Job->Message, jobRunner.Get());
AtomicIncrement(JobCount);
-
+
AddJob(jobRunner.Get());
jobRunner.Release()->Schedule();
@@ -858,14 +858,14 @@ void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessa
job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK));
job->UnRef();
}
-
+
void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) {
TJobRunner* job = GetJob(req.Get());
Y_ASSERT(job);
Y_ASSERT(job->Job->Message != req.Get());
job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK));
job->UnRef();
-}
+}
void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) {
TJobRunner* job = GetJob(msg.Get());
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index dc3b6377f0..8d1c4a5d52 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -9,7 +9,7 @@
/// NBus::TBusSession.
/// To implement the module some virtual functions needs to be overridden:
-
+
/// NBus::TBusModule::CreateExtSession() creates and registers an
/// external session that receives incoming messages as input for module
/// processing.
@@ -18,13 +18,13 @@
/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
/// during processing of one incoming message. Default implementation of
/// NBus::TBusJob will maintain all send and received messages during
-/// lifetime of this job. Each message, status and reply can be found
+/// lifetime of this job. Each message, status and reply can be found
/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
/// needs to maintain an additional information during lifetime of the job
/// you can derive your own class from NBus::TBusJob and override job
-/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
+/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
-/// Processing of a given message starts with a call to NBus::TBusModule::Start()
+/// Processing of a given message starts with a call to NBus::TBusModule::Start()
/// handler that should be overridden in the module implementation. Within
/// the callback handler module can perform any computation and access any
/// datastore tables that it needs. The handler can also access any module
@@ -33,7 +33,7 @@
/// Handler should use NBus::TBusJob::Send() to send messages to other client
/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
-/// job message. When handler is done, it returns the pointer to the next handler to call
+/// job message. When handler is done, it returns the pointer to the next handler to call
/// when all pending messages have cleared. If handler
/// returns pointer to itself the module will reschedule execution of this handler
/// for a later time. This should be done in case NBus::TBusJob::Send() returns
diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp
index 0827972d88..7c38801d62 100644
--- a/library/cpp/messagebus/oldmodule/startsession.cpp
+++ b/library/cpp/messagebus/oldmodule/startsession.cpp
@@ -18,7 +18,7 @@
#include <library/cpp/messagebus/ybus.h>
-namespace NBus {
+namespace NBus {
void* TBusStarter::_starter(void* data) {
TBusStarter* pThis = static_cast<TBusStarter*>(data);
pThis->Starter();
@@ -61,5 +61,5 @@ namespace NBus {
}
}
}
-
-}
+
+}
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h
index 1fed492d5c..57b4267ea5 100644
--- a/library/cpp/messagebus/protobuf/ybusbuf.h
+++ b/library/cpp/messagebus/protobuf/ybusbuf.h
@@ -226,8 +226,8 @@ namespace NBus {
/// serialized protocol specific data into TBusData
void Serialize(const TBusMessage* mess, TBuffer& data) override;
-
+
TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
};
-
+
}
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp
index dd9767c25a..5a5fe52894 100644
--- a/library/cpp/messagebus/scheduler/scheduler.cpp
+++ b/library/cpp/messagebus/scheduler/scheduler.cpp
@@ -4,7 +4,7 @@
#include <util/generic/algorithm.h>
#include <util/generic/yexception.h>
-//#include "dummy_debugger.h"
+//#include "dummy_debugger.h"
using namespace NBus;
using namespace NBus::NPrivate;
diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp
index 69b6825bb8..46a7ece6a8 100644
--- a/library/cpp/messagebus/session.cpp
+++ b/library/cpp/messagebus/session.cpp
@@ -4,7 +4,7 @@
using namespace NBus;
-namespace NBus {
+namespace NBus {
TBusSession::TBusSession() {
}
diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp
index 30c2cc3806..8e891d66b3 100644
--- a/library/cpp/messagebus/synchandler.cpp
+++ b/library/cpp/messagebus/synchandler.cpp
@@ -142,7 +142,7 @@ namespace NBus {
// deletion of message and reply is a job of application.
pMessage->Data = nullptr;
-
+
return reply;
}
};
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index f7f5e88086..8489319278 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -593,12 +593,12 @@ int main(int argc, char* argv[]) {
/* unix foo */
setvbuf(stdout, nullptr, _IONBF, 0);
setvbuf(stderr, nullptr, _IONBF, 0);
- Umask(0);
+ Umask(0);
SetAsyncSignalHandler(SIGINT, stopsignal);
SetAsyncSignalHandler(SIGTERM, stopsignal);
-#ifndef _win_
+#ifndef _win_
SetAsyncSignalHandler(SIGUSR1, stopsignal);
-#endif
+#endif
signal(SIGPIPE, SIG_IGN);
NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
@@ -641,7 +641,7 @@ int main(int argc, char* argv[]) {
TIntrusivePtr<TBusWww> www(new TBusWww);
ServerAddresses = ParseNodes(TheConfig->Nodes);
-
+
if (TheConfig->ServerPort) {
if (TheConfig->ServerUseModules) {
ServerUsingModule = new TPerftestUsingModule();
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
index 16065289ad..9c21227e2b 100644
--- a/library/cpp/messagebus/test/ut/one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -55,7 +55,7 @@ struct NullClient : TBusClientHandlerError {
UNIT_ASSERT(serverAddr.GetPort() > 0);
/// create or get instance of message queue, need one per application
- Queue = CreateMessageQueue();
+ Queue = CreateMessageQueue();
/// register source/client session
Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
@@ -99,7 +99,7 @@ public:
NumMessages = 0;
/// create or get instance of single message queue, need one for application
- Queue = CreateMessageQueue();
+ Queue = CreateMessageQueue();
/// register destination session
TBusServerSessionConfig sessionConfig;
diff --git a/library/cpp/messagebus/ya.make b/library/cpp/messagebus/ya.make
index e8b6701eed..e13cf06dea 100644
--- a/library/cpp/messagebus/ya.make
+++ b/library/cpp/messagebus/ya.make
@@ -1,4 +1,4 @@
-LIBRARY()
+LIBRARY()
OWNER(g:messagebus)
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h
index 2393b840ac..de21ad8521 100644
--- a/library/cpp/messagebus/ybus.h
+++ b/library/cpp/messagebus/ybus.h
@@ -74,10 +74,10 @@ namespace NBus {
int GetPort() const {
return ServicePort;
}
-
+
virtual ~TBusProtocol() {
}
-
+
/// \brief serialized protocol specific data into TBusData
/// \note buffer passed to the function (data) is not empty, use append functions
virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0;
@@ -175,7 +175,7 @@ namespace NBus {
TBusLocator* GetLocator() const {
return Locator.Get();
}
-
+
TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = "");
TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = "");
TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = "");