diff options
author | leo <leo@yandex-team.ru> | 2022-02-10 16:46:40 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:40 +0300 |
commit | 980edcd3304699edf9d4e4d6a656e585028e2a72 (patch) | |
tree | 139f47f3911484ae9af0eb347b1a88bd6c4bb35f /library/cpp/messagebus | |
parent | b036a557f285146e5e35d4213e29a094ab907bcf (diff) | |
download | ydb-980edcd3304699edf9d4e4d6a656e585028e2a72.tar.gz |
Restoring authorship annotation for <leo@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r-- | library/cpp/messagebus/coreconn.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/coreconn.h | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/handler.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/locator.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/message.cpp | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/messqueue.cpp | 24 | ||||
-rw-r--r-- | library/cpp/messagebus/network.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 46 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.h | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/session.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/synchandler.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/test/perftest/perftest.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/ybus.h | 6 |
18 files changed, 73 insertions, 73 deletions
diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp index d9411bb5db..d25f48930e 100644 --- a/library/cpp/messagebus/coreconn.cpp +++ b/library/cpp/messagebus/coreconn.cpp @@ -8,7 +8,7 @@ #include <util/string/util.h> #include <util/system/thread.h> -namespace NBus { +namespace NBus { TBusInstant Now() { return millisec(); } diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h index fca228d82e..7547c9aba6 100644 --- a/library/cpp/messagebus/coreconn.h +++ b/library/cpp/messagebus/coreconn.h @@ -27,10 +27,10 @@ #include <deque> #include <utility> -#ifdef NO_ERROR +#ifdef NO_ERROR #undef NO_ERROR -#endif - +#endif + #define BUS_WORKER_CONDVAR //#define BUS_WORKER_MIXED @@ -63,5 +63,5 @@ namespace NBus { POLL_READ, POLL_WRITE }; - + } diff --git a/library/cpp/messagebus/handler.cpp b/library/cpp/messagebus/handler.cpp index 333bd52934..3ad336c91d 100644 --- a/library/cpp/messagebus/handler.cpp +++ b/library/cpp/messagebus/handler.cpp @@ -1,7 +1,7 @@ #include "handler.h" #include "remote_server_connection.h" -#include "ybus.h" +#include "ybus.h" using namespace NBus; using namespace NBus::NPrivate; diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp index e38a35c426..11716e3cd5 100644 --- a/library/cpp/messagebus/locator.cpp +++ b/library/cpp/messagebus/locator.cpp @@ -9,7 +9,7 @@ #include <util/generic/hash_set.h> #include <util/system/hostname.h> -namespace NBus { +namespace NBus { using namespace NAddr; static TIpPort GetAddrPort(const IRemoteAddr& addr) { @@ -360,7 +360,7 @@ namespace NBus { } int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) { - TGuard<TMutex> G(Lock); + TGuard<TMutex> G(Lock); Y_VERIFY(keys.empty(), "Non empty keys"); TServiceId serviceId = GetServiceId(service); @@ -374,7 +374,7 @@ namespace NBus { continue; } keys.push_back(std::make_pair(item.Start, item.End)); - } + } return (int)keys.size(); } diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp index bfa7ed8e9b..0fba16bc68 100644 --- a/library/cpp/messagebus/message.cpp +++ b/library/cpp/messagebus/message.cpp @@ -9,7 +9,7 @@ using namespace NBus; -namespace NBus { +namespace NBus { using namespace NBus::NPrivate; TBusIdentity::TBusIdentity() @@ -156,7 +156,7 @@ namespace NBus { ReplyTo = data.Connection->PeerAddrSocketAddr; SetCompressed(compressed || IsCompressedResponse()); } - + void TBusMessage::SetCompressed(bool v) { if (v) { GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_INTERNAL; diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index 3474d62705..5de2c1a12c 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -24,17 +24,17 @@ TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBus TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) { return CreateMessageQueue(config, new TBusLocator, name); -} - +} + TBusMessageQueuePtr NBus::CreateMessageQueue(TExecutorPtr executor, const char* name) { return CreateMessageQueue(TBusQueueConfig(), executor, new TBusLocator, name); } TBusMessageQueuePtr NBus::CreateMessageQueue(const char* name) { - TBusQueueConfig config; + TBusQueueConfig config; return CreateMessageQueue(config, name); -} - +} + namespace { TBusQueueConfig QueueConfigFillDefaults(const TBusQueueConfig& orig, const TString& name) { TBusQueueConfig patched = orig; @@ -148,8 +148,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB } catch (...) { Y_FAIL("create destination failure: %s", CurrentExceptionMessage().c_str()); } -} - +} + TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name) { TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name)); try { @@ -179,15 +179,15 @@ void TBusMessageQueue::Destroy(TBusSession* session) { void TBusMessageQueue::DestroyAllSessions() { TList<TIntrusivePtr<TBusSessionImpl>> sessions; - { - TGuard<TMutex> scope(Lock); + { + TGuard<TMutex> scope(Lock); sessions = Sessions; - } - + } + for (auto& session : sessions) { Y_VERIFY(session->IsDown(), "Session must be shut down prior to queue shutdown"); } -} +} void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) { Scheduler.Schedule(i); diff --git a/library/cpp/messagebus/network.cpp b/library/cpp/messagebus/network.cpp index 304bedae5a..214fbb5f1f 100644 --- a/library/cpp/messagebus/network.cpp +++ b/library/cpp/messagebus/network.cpp @@ -1,8 +1,8 @@ -#include "network.h" - +#include "network.h" + #include <util/generic/maybe.h> #include <util/generic/ptr.h> -#include <util/network/init.h> +#include <util/network/init.h> #include <util/network/socket.h> #include <util/system/platform.h> @@ -19,7 +19,7 @@ namespace { } int one = 1; - int r1 = SetSockOpt(fd, SOL_SOCKET, SO_REUSEADDR, one); + int r1 = SetSockOpt(fd, SOL_SOCKET, SO_REUSEADDR, one); if (r1 < 0) { ythrow TSystemError() << "failed to setsockopt SO_REUSEADDR"; } diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..3dc9c80a94 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -60,7 +60,7 @@ namespace { } -namespace NBus { +namespace NBus { namespace NPrivate { class TJobStorage { }; @@ -327,11 +327,11 @@ namespace NBus { { Handler = TJobHandler(&TBusModule::Start); } - + TBusJob::~TBusJob() { Y_ASSERT(Pending.size() == 0); //Y_ASSERT(SleepUntil == 0); - + ClearAllMessageStates(); } @@ -618,7 +618,7 @@ namespace NBus { } ////////////////////////////////////////////////////////////////////// - + void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) { TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob"); if (job) { @@ -636,8 +636,8 @@ namespace NBus { strReturn += "TODO\n"; } return strReturn; - } - + } + TBusModuleConfig::TBusModuleConfig() : StarterMaxInFlight(1000) { @@ -652,10 +652,10 @@ namespace NBus { : Impl(new TBusModuleImpl(this, name)) { } - + TBusModule::~TBusModule() { } - + const char* TBusModule::GetName() const { return Impl->Name; } @@ -680,7 +680,7 @@ namespace NBus { bool TBusModule::Shutdown() { Impl->Shutdown(); - + return true; } @@ -688,16 +688,16 @@ namespace NBus { TBusJob* job = new TBusJob(this, message); return job; } - + /** -Example for external session creation: - -TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { - TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig); - session->RegisterService(hostname, begin, end); - return session; -*/ - +Example for external session creation: + +TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { + TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig); + session->RegisterService(hostname, begin, end); + return session; +*/ + bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) { Impl->Queue = queue; return true; @@ -782,11 +782,11 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) { ShutdownCondVar.BroadCast(); } } - } + } job->JobStorageIterator = TList<TJobRunner*>::iterator(); } - + void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) { TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage(); Y_VERIFY(!!msg); @@ -797,7 +797,7 @@ void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageCon SetJob(jobRunner->Job->Message, jobRunner.Get()); AtomicIncrement(JobCount); - + AddJob(jobRunner.Get()); jobRunner.Release()->Schedule(); @@ -858,14 +858,14 @@ void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessa job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK)); job->UnRef(); } - + void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) { TJobRunner* job = GetJob(req.Get()); Y_ASSERT(job); Y_ASSERT(job->Job->Message != req.Get()); job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK)); job->UnRef(); -} +} void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) { TJobRunner* job = GetJob(msg.Get()); diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index 8d1c4a5d52..dc3b6377f0 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -9,7 +9,7 @@ /// NBus::TBusSession. /// To implement the module some virtual functions needs to be overridden: - + /// NBus::TBusModule::CreateExtSession() creates and registers an /// external session that receives incoming messages as input for module /// processing. @@ -18,13 +18,13 @@ /// NBus::TBusJob is somewhat similar to a thread, it maintains all the state /// during processing of one incoming message. Default implementation of /// NBus::TBusJob will maintain all send and received messages during -/// lifetime of this job. Each message, status and reply can be found +/// lifetime of this job. Each message, status and reply can be found /// within NBus::TJobState using NBus::TBusJob::GetState(). If your module /// needs to maintain an additional information during lifetime of the job /// you can derive your own class from NBus::TBusJob and override job -/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances. +/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances. -/// Processing of a given message starts with a call to NBus::TBusModule::Start() +/// Processing of a given message starts with a call to NBus::TBusModule::Start() /// handler that should be overridden in the module implementation. Within /// the callback handler module can perform any computation and access any /// datastore tables that it needs. The handler can also access any module @@ -33,7 +33,7 @@ /// Handler should use NBus::TBusJob::Send() to send messages to other client /// sessions and it can use NBus::TBusJob::Reply() to send reply to the main -/// job message. When handler is done, it returns the pointer to the next handler to call +/// job message. When handler is done, it returns the pointer to the next handler to call /// when all pending messages have cleared. If handler /// returns pointer to itself the module will reschedule execution of this handler /// for a later time. This should be done in case NBus::TBusJob::Send() returns diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp index 7c38801d62..0827972d88 100644 --- a/library/cpp/messagebus/oldmodule/startsession.cpp +++ b/library/cpp/messagebus/oldmodule/startsession.cpp @@ -18,7 +18,7 @@ #include <library/cpp/messagebus/ybus.h> -namespace NBus { +namespace NBus { void* TBusStarter::_starter(void* data) { TBusStarter* pThis = static_cast<TBusStarter*>(data); pThis->Starter(); @@ -61,5 +61,5 @@ namespace NBus { } } } - -} + +} diff --git a/library/cpp/messagebus/protobuf/ybusbuf.h b/library/cpp/messagebus/protobuf/ybusbuf.h index 57b4267ea5..1fed492d5c 100644 --- a/library/cpp/messagebus/protobuf/ybusbuf.h +++ b/library/cpp/messagebus/protobuf/ybusbuf.h @@ -226,8 +226,8 @@ namespace NBus { /// serialized protocol specific data into TBusData void Serialize(const TBusMessage* mess, TBuffer& data) override; - + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; }; - + } diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp index 5a5fe52894..dd9767c25a 100644 --- a/library/cpp/messagebus/scheduler/scheduler.cpp +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -4,7 +4,7 @@ #include <util/generic/algorithm.h> #include <util/generic/yexception.h> -//#include "dummy_debugger.h" +//#include "dummy_debugger.h" using namespace NBus; using namespace NBus::NPrivate; diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp index 46a7ece6a8..69b6825bb8 100644 --- a/library/cpp/messagebus/session.cpp +++ b/library/cpp/messagebus/session.cpp @@ -4,7 +4,7 @@ using namespace NBus; -namespace NBus { +namespace NBus { TBusSession::TBusSession() { } diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp index 8e891d66b3..30c2cc3806 100644 --- a/library/cpp/messagebus/synchandler.cpp +++ b/library/cpp/messagebus/synchandler.cpp @@ -142,7 +142,7 @@ namespace NBus { // deletion of message and reply is a job of application. pMessage->Data = nullptr; - + return reply; } }; diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index 8489319278..f7f5e88086 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -593,12 +593,12 @@ int main(int argc, char* argv[]) { /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); - Umask(0); + Umask(0); SetAsyncSignalHandler(SIGINT, stopsignal); SetAsyncSignalHandler(SIGTERM, stopsignal); -#ifndef _win_ +#ifndef _win_ SetAsyncSignalHandler(SIGUSR1, stopsignal); -#endif +#endif signal(SIGPIPE, SIG_IGN); NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); @@ -641,7 +641,7 @@ int main(int argc, char* argv[]) { TIntrusivePtr<TBusWww> www(new TBusWww); ServerAddresses = ParseNodes(TheConfig->Nodes); - + if (TheConfig->ServerPort) { if (TheConfig->ServerUseModules) { ServerUsingModule = new TPerftestUsingModule(); diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 9c21227e2b..16065289ad 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -55,7 +55,7 @@ struct NullClient : TBusClientHandlerError { UNIT_ASSERT(serverAddr.GetPort() > 0); /// create or get instance of message queue, need one per application - Queue = CreateMessageQueue(); + Queue = CreateMessageQueue(); /// register source/client session Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); @@ -99,7 +99,7 @@ public: NumMessages = 0; /// create or get instance of single message queue, need one for application - Queue = CreateMessageQueue(); + Queue = CreateMessageQueue(); /// register destination session TBusServerSessionConfig sessionConfig; diff --git a/library/cpp/messagebus/ya.make b/library/cpp/messagebus/ya.make index e13cf06dea..e8b6701eed 100644 --- a/library/cpp/messagebus/ya.make +++ b/library/cpp/messagebus/ya.make @@ -1,4 +1,4 @@ -LIBRARY() +LIBRARY() OWNER(g:messagebus) diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index de21ad8521..2393b840ac 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -74,10 +74,10 @@ namespace NBus { int GetPort() const { return ServicePort; } - + virtual ~TBusProtocol() { } - + /// \brief serialized protocol specific data into TBusData /// \note buffer passed to the function (data) is not empty, use append functions virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0; @@ -175,7 +175,7 @@ namespace NBus { TBusLocator* GetLocator() const { return Locator.Get(); } - + TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = ""); TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = ""); TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = ""); |