aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorprateek <prateek@yandex-team.ru>2022-02-10 16:50:32 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:32 +0300
commit06e925754c8de946ff79d538bde1e6424cbd4cbb (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8
parent30744531a4767f053be08b22b325594d7ed8ffb3 (diff)
downloadydb-06e925754c8de946ff79d538bde1e6424cbd4cbb.tar.gz
Restoring authorship annotation for <prateek@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.h2
-rw-r--r--library/cpp/messagebus/synchandler.cpp74
-rw-r--r--library/cpp/messagebus/test/ut/sync_client_ut.cpp22
-rw-r--r--library/cpp/messagebus/ybus.h4
4 files changed, 51 insertions, 51 deletions
diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h
index cb31217dd8..5e26e7e1e5 100644
--- a/library/cpp/messagebus/oldmodule/startsession.h
+++ b/library/cpp/messagebus/oldmodule/startsession.h
@@ -27,7 +27,7 @@ namespace NBus {
public:
TBusStarter(TBusModule* module, const TBusSessionConfig& config);
~TBusStarter();
-
+
void Shutdown();
};
diff --git a/library/cpp/messagebus/synchandler.cpp b/library/cpp/messagebus/synchandler.cpp
index c95ada8039..8e891d66b3 100644
--- a/library/cpp/messagebus/synchandler.cpp
+++ b/library/cpp/messagebus/synchandler.cpp
@@ -1,32 +1,32 @@
#include "remote_client_session.h"
#include "remote_connection.h"
#include "ybus.h"
-
+
using namespace NBus;
using namespace NBus::NPrivate;
-/////////////////////////////////////////////////////////////////
-/// Object that encapsulates all messgae data required for sending
-/// a message synchronously and receiving a reply. It includes:
-/// 1. ConditionVariable to wait on message reply
-/// 2. Lock used by condition variable
-/// 3. Message reply
-/// 4. Reply status
-struct TBusSyncMessageData {
+/////////////////////////////////////////////////////////////////
+/// Object that encapsulates all messgae data required for sending
+/// a message synchronously and receiving a reply. It includes:
+/// 1. ConditionVariable to wait on message reply
+/// 2. Lock used by condition variable
+/// 3. Message reply
+/// 4. Reply status
+struct TBusSyncMessageData {
TCondVar ReplyEvent;
TMutex ReplyLock;
TBusMessage* Reply;
- EMessageStatus ReplyStatus;
-
- TBusSyncMessageData()
+ EMessageStatus ReplyStatus;
+
+ TBusSyncMessageData()
: Reply(nullptr)
- , ReplyStatus(MESSAGE_DONT_ASK)
+ , ReplyStatus(MESSAGE_DONT_ASK)
{
}
-};
-
+};
+
class TSyncHandler: public IBusClientHandler {
-public:
+public:
TSyncHandler(bool expectReply = true)
: ExpectReply(expectReply)
, Session(nullptr)
@@ -34,7 +34,7 @@ public:
}
~TSyncHandler() override {
}
-
+
void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override {
TBusMessage* pMessage = pMessage0.Release();
TBusMessage* pReply = pReply0.Release();
@@ -45,15 +45,15 @@ public:
TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
SignalResult(data, pReply, MESSAGE_OK);
- }
-
+ }
+
void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override {
TBusMessage* pMessage = pMessage0.Release();
TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
- if (!data) {
- return;
- }
-
+ if (!data) {
+ return;
+ }
+
SignalResult(data, /*pReply=*/nullptr, status);
}
@@ -77,18 +77,18 @@ public:
private:
void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const {
Y_VERIFY(data, "Message data is set to NULL.");
- TGuard<TMutex> G(data->ReplyLock);
+ TGuard<TMutex> G(data->ReplyLock);
data->Reply = pReply;
- data->ReplyStatus = status;
- data->ReplyEvent.Signal();
- }
+ data->ReplyStatus = status;
+ data->ReplyEvent.Signal();
+ }
private:
// This is weird, because in regular client one-way-ness is selected per call, not per session.
bool ExpectReply;
TRemoteClientSession* Session;
-};
-
+};
+
namespace NBus {
namespace NPrivate {
#ifdef _MSC_VER
@@ -168,31 +168,31 @@ void TBusSyncSourceSession::Shutdown() {
TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr) {
return Session->SendSyncMessage(pMessage, status, addr);
-}
-
+}
+
int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) {
return Session->RegisterService(hostname, start, end, ipVersion);
}
-
+
int TBusSyncSourceSession::GetInFlight() {
return Session->GetInFlight();
}
-
+
const TBusProtocol* TBusSyncSourceSession::GetProto() const {
return Session->GetProto();
}
-
+
const TBusClientSession* TBusSyncSourceSession::GetBusClientSessionWorkaroundDoNotUse() const {
return Session.Get();
}
-
+
TBusSyncClientSessionPtr TBusMessageQueue::CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name) {
TIntrusivePtr<TBusSyncSourceSessionImpl> session = new TBusSyncSourceSessionImpl(this, proto, config, needReply, name);
Add(session.Get());
return new TBusSyncSourceSession(session);
}
-
+
void TBusMessageQueue::Destroy(TBusSyncClientSessionPtr session) {
Destroy(session->Session.Get());
Y_UNUSED(session->Session.Release());
-}
+}
diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
index b3d5e74652..400128193f 100644
--- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp
+++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
@@ -1,7 +1,7 @@
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
-
-namespace NBus {
+
+namespace NBus {
namespace NTest {
using namespace std;
@@ -9,34 +9,34 @@ namespace NBus {
/// \brief Client for sending synchronous message to local server
struct TSyncClient {
TNetAddr ServerAddr;
-
+
TExampleProtocol Proto;
TBusMessageQueuePtr Bus;
TBusSyncClientSessionPtr Session;
int NumReplies;
int NumMessages;
-
+
/// constructor creates instances of queue, protocol and session
TSyncClient(const TNetAddr& serverAddr)
: ServerAddr(serverAddr)
{
/// create or get instance of message queue, need one per application
Bus = CreateMessageQueue();
-
+
NumReplies = 0;
NumMessages = 10;
-
+
/// register source/client session
TBusClientSessionConfig sessionConfig;
Session = Bus->CreateSyncSource(&Proto, sessionConfig);
Session->RegisterService("localhost");
}
-
+
~TSyncClient() {
Session->Shutdown();
}
-
+
/// dispatch of requests is done here
void Work() {
for (int i = 0; i < NumMessages; i++) {
@@ -49,7 +49,7 @@ namespace NBus {
}
}
};
-
+
Y_UNIT_TEST_SUITE(SyncClientTest) {
Y_UNIT_TEST(TestSync) {
TObjectCountCheck objectCountCheck;
@@ -63,7 +63,7 @@ namespace NBus {
UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
}
- }
-
+ }
+
}
}
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h
index b363899855..de21ad8521 100644
--- a/library/cpp/messagebus/ybus.h
+++ b/library/cpp/messagebus/ybus.h
@@ -29,10 +29,10 @@
#include <util/generic/ptr.h>
#include <util/stream/input.h>
#include <util/system/atomic.h>
-#include <util/system/condvar.h>
+#include <util/system/condvar.h>
#include <util/system/type_name.h>
#include <util/system/event.h>
-#include <util/system/mutex.h>
+#include <util/system/mutex.h>
namespace NBus {
////////////////////////////////////////////////////////