aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:47 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:47 +0300
commita5950576e397b1909261050b8c7da16db58f10b1 (patch)
tree7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library
parent81eddc8c0b55990194e112b02d127b87d54164a9 (diff)
downloadydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/archive/ya.make20
-rw-r--r--library/cpp/blockcodecs/core/stream.cpp6
-rw-r--r--library/cpp/cache/ya.make16
-rw-r--r--library/cpp/cgiparam/cgiparam.h22
-rw-r--r--library/cpp/comptable/usage/usage.cpp6
-rw-r--r--library/cpp/containers/comptrie/comptrie_ut.cpp6
-rw-r--r--library/cpp/coroutine/engine/coroutine_ut.cpp2
-rw-r--r--library/cpp/dbg_output/dump.h4
-rw-r--r--library/cpp/deprecated/mapped_file/mapped_file.cpp26
-rw-r--r--library/cpp/deprecated/mapped_file/mapped_file.h12
-rw-r--r--library/cpp/digest/old_crc/crc.h6
-rw-r--r--library/cpp/http/server/http.cpp292
-rw-r--r--library/cpp/http/server/http.h126
-rw-r--r--library/cpp/http/server/http_ut.cpp2
-rw-r--r--library/cpp/http/server/ut/ya.make2
-rw-r--r--library/cpp/lfalloc/lf_allocX64.h6
-rw-r--r--library/cpp/messagebus/config/netaddr.h10
-rw-r--r--library/cpp/messagebus/coreconn.cpp4
-rw-r--r--library/cpp/messagebus/coreconn.h24
-rw-r--r--library/cpp/messagebus/event_loop.cpp330
-rw-r--r--library/cpp/messagebus/event_loop.h108
-rw-r--r--library/cpp/messagebus/messqueue.cpp10
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp16
-rw-r--r--library/cpp/messagebus/oldmodule/module.h2
-rw-r--r--library/cpp/messagebus/remote_client_session.cpp12
-rw-r--r--library/cpp/messagebus/remote_client_session.h6
-rw-r--r--library/cpp/messagebus/remote_connection.cpp58
-rw-r--r--library/cpp/messagebus/remote_connection.h26
-rw-r--r--library/cpp/messagebus/remote_server_session.cpp8
-rw-r--r--library/cpp/messagebus/remote_server_session.h8
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.cpp72
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.h32
-rw-r--r--library/cpp/messagebus/session_impl.cpp72
-rw-r--r--library/cpp/messagebus/session_impl.h36
-rw-r--r--library/cpp/messagebus/storage.cpp26
-rw-r--r--library/cpp/messagebus/storage.h20
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp8
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp4
-rw-r--r--library/cpp/messagebus/ybus.h2
-rw-r--r--library/cpp/monlib/service/service.h2
-rw-r--r--library/cpp/packedtypes/packed.h4
-rw-r--r--library/cpp/packedtypes/ya.make26
-rw-r--r--library/cpp/sighandler/ya.make18
-rw-r--r--library/cpp/string_utils/url/url.cpp24
-rw-r--r--library/cpp/string_utils/url/url.h6
-rw-r--r--library/cpp/string_utils/url/url_ut.cpp14
-rw-r--r--library/cpp/testing/unittest/utmain.cpp4
-rw-r--r--library/cpp/threading/future/async.h6
-rw-r--r--library/cpp/unicode/punycode/punycode_ut.cpp16
-rw-r--r--library/cpp/yson_pull/scalar.h2
-rw-r--r--library/python/symbols/libc/ya.make2
51 files changed, 786 insertions, 786 deletions
diff --git a/library/cpp/archive/ya.make b/library/cpp/archive/ya.make
index 65d36479ef..067daad0dd 100644
--- a/library/cpp/archive/ya.make
+++ b/library/cpp/archive/ya.make
@@ -1,12 +1,12 @@
-LIBRARY()
-
-OWNER(pg)
-
-SRCS(
- yarchive.cpp
- yarchive.h
+LIBRARY()
+
+OWNER(pg)
+
+SRCS(
+ yarchive.cpp
+ yarchive.h
directory_models_archive_reader.cpp
directory_models_archive_reader.h
-)
-
-END()
+)
+
+END()
diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp
index 4f7db3c32b..c4b4865245 100644
--- a/library/cpp/blockcodecs/core/stream.cpp
+++ b/library/cpp/blockcodecs/core/stream.cpp
@@ -186,10 +186,10 @@ size_t TDecodedInput::DoUnboundedNext(const void** ptr) {
return 0;
}
- if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) {
+ if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) {
ythrow yexception() << "block size exceeds 1 GiB";
- }
-
+ }
+
TBuffer block;
block.Resize(blockLen);
diff --git a/library/cpp/cache/ya.make b/library/cpp/cache/ya.make
index fd73032bf8..85f6127060 100644
--- a/library/cpp/cache/ya.make
+++ b/library/cpp/cache/ya.make
@@ -1,15 +1,15 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
g:util
)
-
-SRCS(
- cache.cpp
+
+SRCS(
+ cache.cpp
thread_safe_cache.cpp
-)
-
-END()
+)
+
+END()
RECURSE_FOR_TESTS(
ut
diff --git a/library/cpp/cgiparam/cgiparam.h b/library/cpp/cgiparam/cgiparam.h
index 87d1ab0ad4..1da4452adf 100644
--- a/library/cpp/cgiparam/cgiparam.h
+++ b/library/cpp/cgiparam/cgiparam.h
@@ -87,21 +87,21 @@ public:
void InsertEscaped(const TStringBuf name, const TStringBuf value);
-#if !defined(__GLIBCXX__)
+#if !defined(__GLIBCXX__)
template <typename TName, typename TValue>
inline void InsertUnescaped(TName&& name, TValue&& value) {
- // TStringBuf use as TName or TValue is C++17 actually.
- // There is no pair constructor available in C++14 when required type
- // is not implicitly constructible from given type.
- // But libc++ pair allows this with C++14.
+ // TStringBuf use as TName or TValue is C++17 actually.
+ // There is no pair constructor available in C++14 when required type
+ // is not implicitly constructible from given type.
+ // But libc++ pair allows this with C++14.
emplace(std::forward<TName>(name), std::forward<TValue>(value));
}
-#else
- template <typename TName, typename TValue>
- inline void InsertUnescaped(TName&& name, TValue&& value) {
- emplace(TString(name), TString(value));
- }
-#endif
+#else
+ template <typename TName, typename TValue>
+ inline void InsertUnescaped(TName&& name, TValue&& value) {
+ emplace(TString(name), TString(value));
+ }
+#endif
// replace all values for a given key with new values
template <typename TIter>
diff --git a/library/cpp/comptable/usage/usage.cpp b/library/cpp/comptable/usage/usage.cpp
index 9997c83686..b15d9c891d 100644
--- a/library/cpp/comptable/usage/usage.cpp
+++ b/library/cpp/comptable/usage/usage.cpp
@@ -3,9 +3,9 @@
#include <util/random/random.h>
#include <util/random/fast.h>
-#include <time.h>
-#include <stdlib.h>
-
+#include <time.h>
+#include <stdlib.h>
+
using namespace NCompTable;
template <bool HQ>
diff --git a/library/cpp/containers/comptrie/comptrie_ut.cpp b/library/cpp/containers/comptrie/comptrie_ut.cpp
index 74bee09b5d..a5b2ea248a 100644
--- a/library/cpp/containers/comptrie/comptrie_ut.cpp
+++ b/library/cpp/containers/comptrie/comptrie_ut.cpp
@@ -862,14 +862,14 @@ void TCompactTrieTest::TestMergeFromFile() {
{
TCompactTrieBuilder<> b;
- UNIT_ASSERT(b.AddSubtreeInFile("com.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-com"));
+ UNIT_ASSERT(b.AddSubtreeInFile("com.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-com"));
UNIT_ASSERT(b.Add("org.kernel", 22));
- UNIT_ASSERT(b.AddSubtreeInFile("ru.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-ru"));
+ UNIT_ASSERT(b.AddSubtreeInFile("ru.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-ru"));
TUnbufferedFileOutput out(GetSystemTempDir() + "/TCompactTrieTest-TestMerge-res");
b.Save(out);
}
- TCompactTrie<> trie(TBlob::FromFileSingleThreaded(GetSystemTempDir() + "/TCompactTrieTest-TestMerge-res"));
+ TCompactTrie<> trie(TBlob::FromFileSingleThreaded(GetSystemTempDir() + "/TCompactTrieTest-TestMerge-res"));
UNIT_ASSERT_VALUES_EQUAL(12u, trie.Get("ru.yandex"));
UNIT_ASSERT_VALUES_EQUAL(13u, trie.Get("ru.google"));
UNIT_ASSERT_VALUES_EQUAL(14u, trie.Get("ru.mail"));
diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp
index 8b372496a2..82809e63de 100644
--- a/library/cpp/coroutine/engine/coroutine_ut.cpp
+++ b/library/cpp/coroutine/engine/coroutine_ut.cpp
@@ -509,7 +509,7 @@ namespace NCoroWaitWakeLivelockBug {
struct TSubState {
TSubState(TState& parent, ui32 self)
: Parent(parent)
- , Name(TStringBuilder() << "Sub" << self)
+ , Name(TStringBuilder() << "Sub" << self)
, Self(self)
{
UNIT_ASSERT(self < 2);
diff --git a/library/cpp/dbg_output/dump.h b/library/cpp/dbg_output/dump.h
index c7efa105ee..99da7e5802 100644
--- a/library/cpp/dbg_output/dump.h
+++ b/library/cpp/dbg_output/dump.h
@@ -97,10 +97,10 @@ namespace NPrivate {
template <class T, class TColorScheme = DBG_OUTPUT_DEFAULT_COLOR_SCHEME>
static inline ::NPrivate::TDbgDump<T, ::NPrivate::TTraitsShallow<TColorScheme>> DbgDump(const T& t) {
- return {std::addressof(t)};
+ return {std::addressof(t)};
}
template <class T, class TColorScheme = DBG_OUTPUT_DEFAULT_COLOR_SCHEME>
static inline ::NPrivate::TDbgDump<T, ::NPrivate::TTraitsDeep<TColorScheme>> DbgDumpDeep(const T& t) {
- return {std::addressof(t)};
+ return {std::addressof(t)};
}
diff --git a/library/cpp/deprecated/mapped_file/mapped_file.cpp b/library/cpp/deprecated/mapped_file/mapped_file.cpp
index b0e4511299..02dbe5d62e 100644
--- a/library/cpp/deprecated/mapped_file/mapped_file.cpp
+++ b/library/cpp/deprecated/mapped_file/mapped_file.cpp
@@ -10,16 +10,16 @@ TMappedFile::TMappedFile(TFileMap* map, const char* dbgName) {
i64 len = Map_->Length();
if (Hi32(len) != 0 && sizeof(size_t) <= sizeof(ui32))
ythrow yexception() << "File '" << dbgName << "' mapping error: " << len << " too large";
-
+
Map_->Map(0, static_cast<size_t>(len));
-}
-
+}
+
TMappedFile::TMappedFile(const TFile& file, TFileMap::EOpenMode om, const char* dbgName)
: Map_(nullptr)
-{
+{
init(file, om, dbgName);
-}
-
+}
+
void TMappedFile::precharge(size_t off, size_t size) const {
if (!Map_)
return;
@@ -39,24 +39,24 @@ void TMappedFile::init(const TString& name, size_t length, TFileMap::EOpenMode o
THolder<TFileMap> map(new TFileMap(name, length, om));
TMappedFile newFile(map.Get(), name.data());
Y_UNUSED(map.Release());
- newFile.swap(*this);
- newFile.term();
-}
+ newFile.swap(*this);
+ newFile.term();
+}
void TMappedFile::init(const TFile& file, TFileMap::EOpenMode om, const char* dbgName) {
THolder<TFileMap> map(new TFileMap(file, om));
TMappedFile newFile(map.Get(), dbgName);
Y_UNUSED(map.Release());
- newFile.swap(*this);
- newFile.term();
+ newFile.swap(*this);
+ newFile.term();
}
void TMappedFile::init(const TString& name, TFileMap::EOpenMode om) {
THolder<TFileMap> map(new TFileMap(name, om));
TMappedFile newFile(map.Get(), name.data());
Y_UNUSED(map.Release());
- newFile.swap(*this);
- newFile.term();
+ newFile.swap(*this);
+ newFile.term();
}
void TMappedFile::flush() {
diff --git a/library/cpp/deprecated/mapped_file/mapped_file.h b/library/cpp/deprecated/mapped_file/mapped_file.h
index 45859ed65a..93fd9cf000 100644
--- a/library/cpp/deprecated/mapped_file/mapped_file.h
+++ b/library/cpp/deprecated/mapped_file/mapped_file.h
@@ -18,9 +18,9 @@ class TMappedFile {
private:
TFileMap* Map_;
-private:
+private:
TMappedFile(TFileMap* map, const char* dbgName);
-
+
public:
TMappedFile() {
Map_ = nullptr;
@@ -36,11 +36,11 @@ public:
}
TMappedFile(const TFile& file, TFileMap::EOpenMode om = TFileMap::oRdOnly, const char* dbgName = "unknown");
-
+
void init(const TString& name);
void init(const TString& name, TFileMap::EOpenMode om);
-
+
void init(const TString& name, size_t length, TFileMap::EOpenMode om);
void init(const TFile&, TFileMap::EOpenMode om = TFileMap::oRdOnly, const char* dbgName = "unknown");
@@ -65,8 +65,8 @@ public:
}
void precharge(size_t pos = 0, size_t size = (size_t)-1) const;
-
+
void swap(TMappedFile& file) noexcept {
DoSwap(Map_, file.Map_);
- }
+ }
};
diff --git a/library/cpp/digest/old_crc/crc.h b/library/cpp/digest/old_crc/crc.h
index 4a3ce6d05e..0b99a45a7d 100644
--- a/library/cpp/digest/old_crc/crc.h
+++ b/library/cpp/digest/old_crc/crc.h
@@ -79,11 +79,11 @@ namespace NCrcPrivate {
#undef DEF_CRC_FUNC
}
-template <class T>
+template <class T>
static inline T Crc(const void* buf, size_t len, T init) {
return (T)NCrcPrivate::TCrcHelper<8 * sizeof(T)>::Crc(buf, len, init);
-}
-
+}
+
template <class T>
static inline T Crc(const void* buf, size_t len) {
return Crc<T>(buf, len, (T)NCrcPrivate::TCrcHelper<8 * sizeof(T)>::Init);
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 128583bdd7..fbd127a652 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -1,4 +1,4 @@
-#include "http.h"
+#include "http.h"
#include "http_ex.h"
#include <library/cpp/threading/equeue/equeue.h>
@@ -7,25 +7,25 @@
#include <util/generic/cast.h>
#include <util/generic/intrlist.h>
#include <util/generic/yexception.h>
-#include <util/network/address.h>
+#include <util/network/address.h>
#include <util/network/socket.h>
-#include <util/network/poller.h>
-#include <util/system/atomic.h>
-#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
-#include <util/system/defaults.h>
-#include <util/system/event.h>
-#include <util/system/mutex.h>
+#include <util/network/poller.h>
+#include <util/system/atomic.h>
+#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
+#include <util/system/defaults.h>
+#include <util/system/event.h>
+#include <util/system/mutex.h>
#include <util/system/pipe.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/thread/factory.h>
-#include <cerrno>
-#include <cstring>
-#include <ctime>
-
-#include <sys/stat.h>
-#include <sys/types.h>
+#include <cerrno>
+#include <cstring>
+#include <ctime>
+#include <sys/stat.h>
+#include <sys/types.h>
+
using namespace NAddr;
namespace {
@@ -51,18 +51,18 @@ namespace {
}
class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> {
-public:
+public:
TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef);
~TClientConnection() override;
void OnPollEvent(TInstant now) override;
inline void Activate(TInstant now) noexcept;
- inline void DeActivate();
+ inline void DeActivate();
inline void Reject();
-public:
- TSocket Socket_;
+public:
+ TSocket Socket_;
NAddr::IRemoteAddrRef ListenerSockAddrRef_;
THttpServer::TImpl* HttpServ_ = nullptr;
bool Reject_ = false;
@@ -72,70 +72,70 @@ public:
};
class THttpServer::TImpl {
-public:
- class TConnections {
+public:
+ class TConnections {
public:
inline TConnections(TSocketPoller* poller, const THttpServerOptions& options)
- : Poller_(poller)
+ : Poller_(poller)
, Options(options)
- {
- }
+ {
+ }
inline ~TConnections() {
- }
+ }
inline void Add(TClientConnection* c) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
- Conns_.PushBack(c);
- Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
- }
+ Conns_.PushBack(c);
+ Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
+ }
inline void Erase(TClientConnection* c, TInstant now) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
EraseUnsafe(c);
if (Options.ExpirationTimeout > TDuration::Zero()) {
TryRemovingUnsafe(now - Options.ExpirationTimeout);
}
- }
+ }
inline void Clear() noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
- Conns_.Clear();
- }
+ Conns_.Clear();
+ }
inline bool RemoveOld(TInstant border) noexcept {
- TGuard<TMutex> g(Mutex_);
+ TGuard<TMutex> g(Mutex_);
return TryRemovingUnsafe(border);
}
bool TryRemovingUnsafe(TInstant border) noexcept {
- if (Conns_.Empty()) {
+ if (Conns_.Empty()) {
return false;
- }
- TClientConnection* c = &*(Conns_.Begin());
+ }
+ TClientConnection* c = &*(Conns_.Begin());
if (c->LastUsed > border) {
return false;
}
EraseUnsafe(c);
- delete c;
+ delete c;
return true;
- }
+ }
void EraseUnsafe(TClientConnection* c) noexcept {
Poller_->Unwait(c->Socket_);
c->Unlink();
}
- public:
- TMutex Mutex_;
- TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
+ public:
+ TMutex Mutex_;
+ TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
TSocketPoller* Poller_ = nullptr;
const THttpServerOptions& Options;
- };
+ };
- static void* ListenSocketFunction(void* param) {
+ static void* ListenSocketFunction(void* param) {
try {
((TImpl*)param)->ListenSocket();
} catch (...) {
@@ -143,19 +143,19 @@ public:
}
return nullptr;
- }
+ }
- TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
- THolder<TClientRequest> obj(Cb_->CreateClient());
+ TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
+ THolder<TClientRequest> obj(Cb_->CreateClient());
- obj->Conn_.Reset(c.Release());
-
- return obj;
- }
+ obj->Conn_.Reset(c.Release());
+ return obj;
+ }
+
void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) {
if (MaxRequestsReached()) {
- Cb_->OnMaxConn();
+ Cb_->OnMaxConn();
bool wasRemoved = Connections->RemoveOld(TInstant::Max());
if (!wasRemoved && Options_.RejectExcessConnections) {
(new TClientConnection(s, this, listenerSockAddrRef))->Reject();
@@ -166,27 +166,27 @@ public:
auto connection = new TClientConnection(s, this, listenerSockAddrRef);
connection->LastUsed = now;
connection->DeActivate();
- }
+ }
- void SaveErrorCode() {
+ void SaveErrorCode() {
ErrorCode = WSAGetLastError();
- }
+ }
int GetErrorCode() const {
return ErrorCode;
- }
+ }
const char* GetError() const {
return LastSystemErrorText(ErrorCode);
- }
+ }
- bool Start() {
- Poller.Reset(new TSocketPoller());
+ bool Start() {
+ Poller.Reset(new TSocketPoller());
Connections.Reset(new TConnections(Poller.Get(), Options_));
-
+
// Start the listener thread
- ListenerRunningOK = false;
-
+ ListenerRunningOK = false;
+
// throws on error
TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd);
@@ -195,31 +195,31 @@ public:
Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble);
- ListenStartEvent.Reset();
- try {
+ ListenStartEvent.Reset();
+ try {
ListenThread.Reset(new TThread(ListenSocketFunction, this));
ListenThread->Start();
- } catch (const yexception&) {
- SaveErrorCode();
- return false;
+ } catch (const yexception&) {
+ SaveErrorCode();
+ return false;
}
// Wait until the thread has completely started and return the success indicator
- ListenStartEvent.Wait();
+ ListenStartEvent.Wait();
- return ListenerRunningOK;
- }
+ return ListenerRunningOK;
+ }
- void Wait() {
- Cb_->OnWait();
+ void Wait() {
+ Cb_->OnWait();
TGuard<TMutex> g(StopMutex);
if (ListenThread) {
ListenThread->Join();
ListenThread.Reset(nullptr);
}
- }
+ }
- void Stop() {
+ void Stop() {
Shutdown();
TGuard<TMutex> g(StopMutex);
@@ -228,19 +228,19 @@ public:
ListenThread.Reset(nullptr);
}
- while (ConnectionCount) {
- usleep(10000);
- Connections->Clear();
+ while (ConnectionCount) {
+ usleep(10000);
+ Connections->Clear();
}
- Connections.Destroy();
- Poller.Destroy();
- }
+ Connections.Destroy();
+ Poller.Destroy();
+ }
- void Shutdown() {
+ void Shutdown() {
ListenWakeupWriteFd.Write("", 1);
// ignore result
- }
+ }
void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
struct TFailRequest: public THttpClientRequestEx {
@@ -257,20 +257,20 @@ public:
ProcessFailRequest(0);
return true;
}
- };
+ };
if (!fail && Requests->Add(req.Get())) {
Y_UNUSED(req.Release());
- } else {
- req = new TFailRequest(req);
+ } else {
+ req = new TFailRequest(req);
- if (FailRequests->Add(req.Get())) {
+ if (FailRequests->Add(req.Get())) {
Y_UNUSED(req.Release());
} else {
- Cb_->OnFailRequest(-1);
+ Cb_->OnFailRequest(-1);
}
}
- }
+ }
size_t GetRequestQueueSize() const {
return Requests->Size();
@@ -305,8 +305,8 @@ public:
if (s == INVALID_SOCKET) {
ythrow yexception() << "accept: " << LastSystemErrorText();
- }
-
+ }
+
Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_);
}
@@ -318,13 +318,13 @@ public:
TSocket S_;
TImpl* Server_ = nullptr;
NAddr::IRemoteAddrRef SockAddrRef_;
- };
+ };
- void ListenSocket() {
+ void ListenSocket() {
TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
ErrorCode = 0;
- TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
+ TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
std::function<void(TSocket)> callback = [&](TSocket socket) {
THolder<TListenSocket> ls(new TListenSocket(socket, this));
@@ -337,65 +337,65 @@ public:
ListenStartEvent.Signal();
return;
- }
+ }
- Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
- FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
- Cb_->OnListenStart();
- ListenerRunningOK = true;
- ListenStartEvent.Signal();
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
+ Cb_->OnListenStart();
+ ListenerRunningOK = true;
+ ListenStartEvent.Signal();
TVector<void*> events;
- events.resize(1);
+ events.resize(1);
TInstant now = TInstant::Now();
for (;;) {
- try {
+ try {
const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;
const size_t ret = Poller->WaitD(events.data(), events.size(), deadline);
now = TInstant::Now();
- for (size_t i = 0; i < ret; ++i) {
+ for (size_t i = 0; i < ret; ++i) {
((IPollAble*)events[i])->OnPollEvent(now);
- }
+ }
if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) {
Connections->RemoveOld(now - Options_.ExpirationTimeout);
}
// When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call
- // RemoveOld and destroy other IPollAble* objects in the
- // poller. Thus in this case we can safely process only one
- // event from the poller at a time.
+ // RemoveOld and destroy other IPollAble* objects in the
+ // poller. Thus in this case we can safely process only one
+ // event from the poller at a time.
if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) {
if (ret >= events.size()) {
- events.resize(ret * 2);
+ events.resize(ret * 2);
}
}
} catch (const TShouldStop&) {
break;
- } catch (...) {
- Cb_->OnException();
+ } catch (...) {
+ Cb_->OnException();
}
}
- while (!Reqs.Empty()) {
- THolder<TListenSocket> ls(Reqs.PopFront());
+ while (!Reqs.Empty()) {
+ THolder<TListenSocket> ls(Reqs.PopFront());
- Poller->Unwait(ls->GetSocket());
+ Poller->Unwait(ls->GetSocket());
}
- Requests->Stop();
- FailRequests->Stop();
- Cb_->OnListenStop();
- }
+ Requests->Stop();
+ FailRequests->Stop();
+ Cb_->OnListenStop();
+ }
- void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
- Requests->Stop();
- Options_.nThreads = nTh;
- Options_.MaxQueueSize = maxQS;
- Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
- }
+ void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
+ Requests->Stop();
+ Options_.nThreads = nTh;
+ Options_.MaxQueueSize = maxQS;
+ Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
+ }
TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_)
: Requests(mainWorkers)
@@ -415,29 +415,29 @@ public:
options) {
}
- ~TImpl() {
- try {
- Stop();
- } catch (...) {
+ ~TImpl() {
+ try {
+ Stop();
+ } catch (...) {
}
- }
+ }
inline const TOptions& Options() const noexcept {
- return Options_;
- }
+ return Options_;
+ }
inline void DecreaseConnections() noexcept {
- AtomicDecrement(ConnectionCount);
- }
+ AtomicDecrement(ConnectionCount);
+ }
inline void IncreaseConnections() noexcept {
- AtomicIncrement(ConnectionCount);
- }
-
- inline i64 GetClientCount() const {
- return AtomicGet(ConnectionCount);
- }
-
+ AtomicIncrement(ConnectionCount);
+ }
+
+ inline i64 GetClientCount() const {
+ return AtomicGet(ConnectionCount);
+ }
+
inline bool MaxRequestsReached() const {
return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
}
@@ -449,11 +449,11 @@ public:
TMtpQueueRef Requests;
TMtpQueueRef FailRequests;
TAtomic ConnectionCount = 0;
- THolder<TSocketPoller> Poller;
- THolder<TConnections> Connections;
+ THolder<TSocketPoller> Poller;
+ THolder<TConnections> Connections;
bool ListenerRunningOK = false;
int ErrorCode = 0;
- TOptions Options_;
+ TOptions Options_;
ICallBack* Cb_ = nullptr;
THttpServer* Parent_ = nullptr;
TWakeupPollAble WakeupPollAble;
@@ -571,7 +571,7 @@ TClientConnection::~TClientConnection() {
}
void TClientConnection::OnPollEvent(TInstant now) {
- THolder<TClientConnection> this_(this);
+ THolder<TClientConnection> this_(this);
Activate(now);
{
@@ -588,7 +588,7 @@ void TClientConnection::OnPollEvent(TInstant now) {
}
}
- THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
+ THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
AcceptMoment = now;
HttpServ_->AddRequest(obj, Reject_);
@@ -601,7 +601,7 @@ void TClientConnection::Activate(TInstant now) noexcept {
}
void TClientConnection::DeActivate() {
- HttpServ_->Connections->Add(this);
+ HttpServ_->Connections->Add(this);
}
void TClientConnection::Reject() {
@@ -677,7 +677,7 @@ void TClientRequest::ResetConnection() {
}
void TClientRequest::Process(void* ThreadSpecificResource) {
- THolder<TClientRequest> this_(this);
+ THolder<TClientRequest> this_(this);
auto* serverImpl = Conn_->HttpServ_;
diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h
index b292d38f27..d0bc92ff7d 100644
--- a/library/cpp/http/server/http.h
+++ b/library/cpp/http/server/http.h
@@ -15,75 +15,75 @@ class TClientRequest;
class TClientConnection;
class THttpServer {
- friend class TClientRequest;
- friend class TClientConnection;
+ friend class TClientRequest;
+ friend class TClientConnection;
-public:
- class ICallBack {
+public:
+ class ICallBack {
public:
- struct TFailLogData {
- int failstate;
+ struct TFailLogData {
+ int failstate;
TString url;
- };
+ };
- virtual ~ICallBack() {
- }
+ virtual ~ICallBack() {
+ }
- virtual void OnFailRequest(int /*failstate*/) {
- }
+ virtual void OnFailRequest(int /*failstate*/) {
+ }
- virtual void OnFailRequestEx(const TFailLogData& d) {
- OnFailRequest(d.failstate);
- }
+ virtual void OnFailRequestEx(const TFailLogData& d) {
+ OnFailRequest(d.failstate);
+ }
- virtual void OnException() {
- }
+ virtual void OnException() {
+ }
- virtual void OnMaxConn() {
- }
+ virtual void OnMaxConn() {
+ }
- virtual TClientRequest* CreateClient() = 0;
+ virtual TClientRequest* CreateClient() = 0;
- virtual void OnListenStart() {
- }
+ virtual void OnListenStart() {
+ }
- virtual void OnListenStop() {
- }
+ virtual void OnListenStop() {
+ }
- virtual void OnWait() {
- }
+ virtual void OnWait() {
+ }
virtual void* CreateThreadSpecificResource() {
return nullptr;
- }
+ }
- virtual void DestroyThreadSpecificResource(void*) {
- }
- };
+ virtual void DestroyThreadSpecificResource(void*) {
+ }
+ };
- typedef THttpServerOptions TOptions;
+ typedef THttpServerOptions TOptions;
typedef TSimpleSharedPtr<IThreadPool> TMtpQueueRef;
THttpServer(ICallBack* cb, const TOptions& options = TOptions(), IThreadFactory* pool = nullptr);
THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options = TOptions());
- virtual ~THttpServer();
+ virtual ~THttpServer();
- bool Start();
+ bool Start();
- // shutdown a.s.a.p.
- void Stop();
+ // shutdown a.s.a.p.
+ void Stop();
// graceful shutdown with serving all already open connections
- void Shutdown();
+ void Shutdown();
- void Wait();
- int GetErrorCode();
- const char* GetError();
- void RestartRequestThreads(ui32 nTh, ui32 maxQS);
+ void Wait();
+ int GetErrorCode();
+ const char* GetError();
+ void RestartRequestThreads(ui32 nTh, ui32 maxQS);
const TOptions& Options() const noexcept;
- i64 GetClientCount() const;
+ i64 GetClientCount() const;
- class TImpl;
+ class TImpl;
size_t GetRequestQueueSize() const;
size_t GetFailQueueSize() const;
@@ -92,30 +92,30 @@ public:
static TAtomicBase AcceptReturnsInvalidSocketCounter();
-private:
+private:
bool MaxRequestsReached() const;
private:
- THolder<TImpl> Impl_;
+ THolder<TImpl> Impl_;
};
/**
* @deprecated Use TRequestReplier instead
*/
class TClientRequest: public IObjectInQueue {
- friend class THttpServer::TImpl;
+ friend class THttpServer::TImpl;
-public:
- TClientRequest();
+public:
+ TClientRequest();
~TClientRequest() override;
inline THttpInput& Input() noexcept {
- return *HttpConn_->Input();
- }
+ return *HttpConn_->Input();
+ }
inline THttpOutput& Output() noexcept {
- return *HttpConn_->Output();
- }
+ return *HttpConn_->Output();
+ }
THttpServer* HttpServ() const noexcept;
const TSocket& Socket() const noexcept;
@@ -123,29 +123,29 @@ public:
TInstant AcceptMoment() const noexcept;
bool IsLocal() const;
- bool CheckLoopback();
- void ProcessFailRequest(int failstate);
+ bool CheckLoopback();
+ void ProcessFailRequest(int failstate);
void ReleaseConnection();
void ResetConnection();
-private:
- /*
- * Processes the request after 'connection' been created and 'Headers' been read
- * Returns 'false' if the processing must be continued by the next handler,
- * 'true' otherwise ('this' will be deleted)
- */
- virtual bool Reply(void* ThreadSpecificResource);
+private:
+ /*
+ * Processes the request after 'connection' been created and 'Headers' been read
+ * Returns 'false' if the processing must be continued by the next handler,
+ * 'true' otherwise ('this' will be deleted)
+ */
+ virtual bool Reply(void* ThreadSpecificResource);
void Process(void* ThreadSpecificResource) override;
-public:
+public:
TVector<std::pair<TString, TString>> ParsedHeaders;
TString RequestString;
-private:
- THolder<TClientConnection> Conn_;
- THolder<THttpServerConn> HttpConn_;
+private:
+ THolder<TClientConnection> Conn_;
+ THolder<THttpServerConn> HttpConn_;
};
class TRequestReplier: public TClientRequest {
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
index cc62bb988e..5f21fc935c 100644
--- a/library/cpp/http/server/http_ut.cpp
+++ b/library/cpp/http/server/http_ut.cpp
@@ -330,7 +330,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
for (int i = 0; i < 2; ++i) {
- UNIT_ASSERT(server.Start());
+ UNIT_ASSERT(server.Start());
TTestRequest r(port);
r.Content = res;
diff --git a/library/cpp/http/server/ut/ya.make b/library/cpp/http/server/ut/ya.make
index bcb4d4c0b8..47a208e867 100644
--- a/library/cpp/http/server/ut/ya.make
+++ b/library/cpp/http/server/ut/ya.make
@@ -1,6 +1,6 @@
UNITTEST_FOR(library/cpp/http/server)
-OWNER(pg)
+OWNER(pg)
SIZE(MEDIUM)
diff --git a/library/cpp/lfalloc/lf_allocX64.h b/library/cpp/lfalloc/lf_allocX64.h
index fd2a906d6f..979e26c699 100644
--- a/library/cpp/lfalloc/lf_allocX64.h
+++ b/library/cpp/lfalloc/lf_allocX64.h
@@ -98,17 +98,17 @@ static inline long AtomicSub(TAtomic& a, long b) {
#include <new>
#include <errno.h>
-#if defined(_linux_)
+#if defined(_linux_)
#include <linux/futex.h>
#include <sys/syscall.h>
#if !defined(MADV_HUGEPAGE)
#define MADV_HUGEPAGE 14
-#endif
+#endif
#if !defined(MAP_HUGETLB)
#define MAP_HUGETLB 0x40000
#endif
#endif
-
+
#define PERTHREAD __thread
#endif
diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h
index b79c0cc355..573458ba72 100644
--- a/library/cpp/messagebus/config/netaddr.h
+++ b/library/cpp/messagebus/config/netaddr.h
@@ -1,6 +1,6 @@
#pragma once
-#include <util/digest/numeric.h>
+#include <util/digest/numeric.h>
#include <util/generic/hash.h>
#include <util/generic/ptr.h>
#include <util/generic/strbuf.h>
@@ -74,13 +74,13 @@ namespace NBus {
switch (s->sa_family) {
case AF_INET:
return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa->sin_addr), sizeof(sa->sin_addr))), IntHashImpl(sa->sin_port));
-
+
case AF_INET6:
return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa6->sin6_addr), sizeof(sa6->sin6_addr))), IntHashImpl(sa6->sin6_port));
}
-
+
return ComputeHash(TStringBuf(reinterpret_cast<const char*>(s), a.Len()));
- }
+ }
};
-
+
}
diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp
index d9411bb5db..d9436f15d7 100644
--- a/library/cpp/messagebus/coreconn.cpp
+++ b/library/cpp/messagebus/coreconn.cpp
@@ -2,10 +2,10 @@
#include "remote_connection.h"
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
#include <util/generic/yexception.h>
#include <util/network/socket.h>
-#include <util/string/util.h>
+#include <util/string/util.h>
#include <util/system/thread.h>
namespace NBus {
diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h
index fca228d82e..f6ec07bef4 100644
--- a/library/cpp/messagebus/coreconn.h
+++ b/library/cpp/messagebus/coreconn.h
@@ -5,24 +5,24 @@
/// \brief Definitions for asynchonous connection queue
#include "base.h"
-#include "event_loop.h"
+#include "event_loop.h"
#include "netaddr.h"
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
#include <util/generic/algorithm.h>
#include <util/generic/list.h>
-#include <util/generic/map.h>
-#include <util/generic/set.h>
+#include <util/generic/map.h>
+#include <util/generic/set.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
#include <util/network/address.h>
#include <util/network/ip.h>
-#include <util/network/poller.h>
-#include <util/string/util.h>
-#include <util/system/condvar.h>
+#include <util/network/poller.h>
+#include <util/string/util.h>
+#include <util/system/condvar.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
-#include <util/thread/lfqueue.h>
+#include <util/thread/lfqueue.h>
#include <deque>
#include <utility>
@@ -31,9 +31,9 @@
#undef NO_ERROR
#endif
-#define BUS_WORKER_CONDVAR
-//#define BUS_WORKER_MIXED
-
+#define BUS_WORKER_CONDVAR
+//#define BUS_WORKER_MIXED
+
namespace NBus {
class TBusConnection;
class TBusConnectionFactory;
@@ -64,4 +64,4 @@ namespace NBus {
POLL_WRITE
};
-}
+}
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index f685135bed..b1209d2b5c 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -5,50 +5,50 @@
#include <util/generic/hash.h>
#include <util/network/pair.h>
-#include <util/network/poller.h>
+#include <util/network/poller.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
-#include <util/system/yassert.h>
+#include <util/system/yassert.h>
#include <util/thread/lfqueue.h>
-
+
#include <errno.h>
-using namespace NEventLoop;
-
-namespace {
+using namespace NEventLoop;
+
+namespace {
enum ERunningState {
EVENT_LOOP_CREATED,
EVENT_LOOP_RUNNING,
EVENT_LOOP_STOPPED,
};
- enum EOperation {
- OP_READ = 1,
- OP_WRITE = 2,
- OP_READ_WRITE = OP_READ | OP_WRITE,
- };
-}
-
-class TChannel::TImpl {
-public:
+ enum EOperation {
+ OP_READ = 1,
+ OP_WRITE = 2,
+ OP_READ_WRITE = OP_READ | OP_WRITE,
+ };
+}
+
+class TChannel::TImpl {
+public:
TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie);
~TImpl();
-
- void EnableRead();
- void DisableRead();
- void EnableWrite();
- void DisableWrite();
-
- void Unregister();
-
- SOCKET GetSocket() const;
+
+ void EnableRead();
+ void DisableRead();
+ void EnableWrite();
+ void DisableWrite();
+
+ void Unregister();
+
+ SOCKET GetSocket() const;
TSocket GetSocketPtr() const;
-
+
void Update(int pollerFlags, bool enable);
void CallHandler();
- TEventLoop::TImpl* EventLoop;
+ TEventLoop::TImpl* EventLoop;
TSocket Socket;
TEventHandlerPtr EventHandler;
void* Cookie;
@@ -57,130 +57,130 @@ public:
int CurrentFlags;
bool Close;
-};
-
-class TEventLoop::TImpl {
-public:
+};
+
+class TEventLoop::TImpl {
+public:
TImpl(const char* name);
-
- void Run();
- void Wakeup();
- void Stop();
-
+
+ void Run();
+ void Wakeup();
+ void Stop();
+
TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie);
- void Unregister(SOCKET socket);
-
+ void Unregister(SOCKET socket);
+
typedef THashMap<SOCKET, TChannelPtr> TData;
-
+
void AddToPoller(SOCKET socket, void* cookie, int flags);
-
+
TMutex Mutex;
-
+
const char* Name;
TAtomic RunningState;
TAtomic StopSignal;
TSystemEvent StoppedEvent;
- TData Data;
-
+ TData Data;
+
TLockFreeQueue<SOCKET> SocketsToRemove;
- TSocketPoller Poller;
- TSocketHolder WakeupReadSocket;
- TSocketHolder WakeupWriteSocket;
-};
-
-TChannel::~TChannel() {
-}
-
-void TChannel::EnableRead() {
- Impl->EnableRead();
-}
-
-void TChannel::DisableRead() {
- Impl->DisableRead();
-}
-
-void TChannel::EnableWrite() {
- Impl->EnableWrite();
-}
-
-void TChannel::DisableWrite() {
- Impl->DisableWrite();
-}
-
-void TChannel::Unregister() {
- Impl->Unregister();
-}
-
-SOCKET TChannel::GetSocket() const {
- return Impl->GetSocket();
-}
-
+ TSocketPoller Poller;
+ TSocketHolder WakeupReadSocket;
+ TSocketHolder WakeupWriteSocket;
+};
+
+TChannel::~TChannel() {
+}
+
+void TChannel::EnableRead() {
+ Impl->EnableRead();
+}
+
+void TChannel::DisableRead() {
+ Impl->DisableRead();
+}
+
+void TChannel::EnableWrite() {
+ Impl->EnableWrite();
+}
+
+void TChannel::DisableWrite() {
+ Impl->DisableWrite();
+}
+
+void TChannel::Unregister() {
+ Impl->Unregister();
+}
+
+SOCKET TChannel::GetSocket() const {
+ return Impl->GetSocket();
+}
+
TSocket TChannel::GetSocketPtr() const {
return Impl->GetSocketPtr();
}
TChannel::TChannel(TImpl* impl)
: Impl(impl)
-{
-}
-
+{
+}
+
TEventLoop::TEventLoop(const char* name)
: Impl(new TImpl(name))
-{
-}
-
+{
+}
+
TEventLoop::~TEventLoop() {
-}
-
-void TEventLoop::Run() {
- Impl->Run();
-}
-
-void TEventLoop::Stop() {
- Impl->Stop();
-}
-
+}
+
+void TEventLoop::Run() {
+ Impl->Run();
+}
+
+void TEventLoop::Stop() {
+ Impl->Stop();
+}
+
bool TEventLoop::IsRunning() {
return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
}
TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
return Impl->Register(socket, eventHandler, cookie);
-}
-
+}
+
TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie)
- : EventLoop(eventLoop)
- , Socket(socket)
+ : EventLoop(eventLoop)
+ , Socket(socket)
, EventHandler(eventHandler)
, Cookie(cookie)
, CurrentFlags(0)
, Close(false)
-{
-}
-
+{
+}
+
TChannel::TImpl::~TImpl() {
Y_ASSERT(Close);
}
-void TChannel::TImpl::EnableRead() {
+void TChannel::TImpl::EnableRead() {
Update(OP_READ, true);
-}
-
-void TChannel::TImpl::DisableRead() {
+}
+
+void TChannel::TImpl::DisableRead() {
Update(OP_READ, false);
-}
-
-void TChannel::TImpl::EnableWrite() {
+}
+
+void TChannel::TImpl::EnableWrite() {
Update(OP_WRITE, true);
-}
-
-void TChannel::TImpl::DisableWrite() {
+}
+
+void TChannel::TImpl::DisableWrite() {
Update(OP_WRITE, false);
-}
-
-void TChannel::TImpl::Unregister() {
+}
+
+void TChannel::TImpl::Unregister() {
TGuard<TMutex> guard(Mutex);
if (Close) {
@@ -196,8 +196,8 @@ void TChannel::TImpl::Unregister() {
EventLoop->SocketsToRemove.Enqueue(Socket);
EventLoop->Wakeup();
-}
-
+}
+
void TChannel::TImpl::Update(int flags, bool enable) {
TGuard<TMutex> guard(Mutex);
@@ -221,10 +221,10 @@ void TChannel::TImpl::Update(int flags, bool enable) {
CurrentFlags = newFlags;
}
-SOCKET TChannel::TImpl::GetSocket() const {
- return Socket;
-}
-
+SOCKET TChannel::TImpl::GetSocket() const {
+ return Socket;
+}
+
TSocket TChannel::TImpl::GetSocketPtr() const {
return Socket;
}
@@ -256,27 +256,27 @@ TEventLoop::TImpl::TImpl(const char* name)
: Name(name)
, RunningState(EVENT_LOOP_CREATED)
, StopSignal(0)
-{
- SOCKET wakeupSockets[2];
-
+{
+ SOCKET wakeupSockets[2];
+
if (SocketPair(wakeupSockets) < 0) {
Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText());
}
-
- TSocketHolder wakeupReadSocket(wakeupSockets[0]);
- TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
-
- WakeupReadSocket.Swap(wakeupReadSocket);
- WakeupWriteSocket.Swap(wakeupWriteSocket);
-
- SetNonBlock(WakeupWriteSocket, true);
- SetNonBlock(WakeupReadSocket, true);
-
- Poller.WaitRead(WakeupReadSocket,
+
+ TSocketHolder wakeupReadSocket(wakeupSockets[0]);
+ TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
+
+ WakeupReadSocket.Swap(wakeupReadSocket);
+ WakeupWriteSocket.Swap(wakeupWriteSocket);
+
+ SetNonBlock(WakeupWriteSocket, true);
+ SetNonBlock(WakeupReadSocket, true);
+
+ Poller.WaitRead(WakeupReadSocket,
reinterpret_cast<void*>(this));
-}
-
-void TEventLoop::TImpl::Run() {
+}
+
+void TEventLoop::TImpl::Run() {
bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
Y_VERIFY(res, "Invalid mbus event loop state");
@@ -285,30 +285,30 @@ void TEventLoop::TImpl::Run() {
}
while (AtomicGet(StopSignal) == 0) {
- void* cookies[1024];
+ void* cookies[1024];
const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
-
- void** end = cookies + count;
- for (void** c = cookies; c != end; ++c) {
+
+ void** end = cookies + count;
+ for (void** c = cookies; c != end; ++c) {
TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c);
-
+
if (*c == this) {
- char buf[0x1000];
+ char buf[0x1000];
if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) {
Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText());
}
- continue;
- }
-
+ continue;
+ }
+
s->CallHandler();
- }
+ }
SOCKET socket = -1;
while (SocketsToRemove.Dequeue(&socket)) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(Data.erase(socket) == 1, "must be removed once");
}
- }
+ }
{
TGuard<TMutex> guard(Mutex);
@@ -325,9 +325,9 @@ void TEventLoop::TImpl::Run() {
Y_VERIFY(res);
StoppedEvent.Signal();
-}
-
-void TEventLoop::TImpl::Stop() {
+}
+
+void TEventLoop::TImpl::Stop() {
AtomicSet(StopSignal, 1);
if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
@@ -335,36 +335,36 @@ void TEventLoop::TImpl::Stop() {
StoppedEvent.WaitI();
}
-}
-
+}
+
TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie));
-
+
TGuard<TMutex> guard(Mutex);
-
+
Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted");
-
+
return channel;
-}
-
-void TEventLoop::TImpl::Wakeup() {
+}
+
+void TEventLoop::TImpl::Wakeup() {
if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) {
if (LastSystemError() != EAGAIN) {
Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText());
}
- }
-}
-
-void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
- if (flags == OP_READ) {
+ }
+}
+
+void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
+ if (flags == OP_READ) {
Poller.WaitReadOneShot(socket, cookie);
- } else if (flags == OP_WRITE) {
+ } else if (flags == OP_WRITE) {
Poller.WaitWriteOneShot(socket, cookie);
- } else if (flags == OP_READ_WRITE) {
+ } else if (flags == OP_READ_WRITE) {
Poller.WaitReadWriteOneShot(socket, cookie);
- } else {
+ } else {
Y_FAIL("Wrong flags: %d", int(flags));
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/event_loop.h b/library/cpp/messagebus/event_loop.h
index d5b0a53b0c..677ade2fff 100644
--- a/library/cpp/messagebus/event_loop.h
+++ b/library/cpp/messagebus/event_loop.h
@@ -1,72 +1,72 @@
-#pragma once
-
+#pragma once
+
#include <util/generic/object_counter.h>
-#include <util/generic/ptr.h>
-#include <util/network/init.h>
+#include <util/generic/ptr.h>
+#include <util/network/init.h>
#include <util/network/socket.h>
-
-namespace NEventLoop {
- struct IEventHandler
+
+namespace NEventLoop {
+ struct IEventHandler
: public TAtomicRefCount<IEventHandler> {
virtual void HandleEvent(SOCKET socket, void* cookie) = 0;
virtual ~IEventHandler() {
}
- };
-
- typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr;
-
- class TEventLoop;
-
+ };
+
+ typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr;
+
+ class TEventLoop;
+
// TODO: make TChannel itself a pointer
// to avoid confusion with Drop and Unregister
- class TChannel
+ class TChannel
: public TAtomicRefCount<TChannel> {
- public:
- ~TChannel();
-
- void EnableRead();
- void DisableRead();
- void EnableWrite();
- void DisableWrite();
-
- void Unregister();
-
- SOCKET GetSocket() const;
+ public:
+ ~TChannel();
+
+ void EnableRead();
+ void DisableRead();
+ void EnableWrite();
+ void DisableWrite();
+
+ void Unregister();
+
+ SOCKET GetSocket() const;
TSocket GetSocketPtr() const;
-
- private:
- class TImpl;
- friend class TEventLoop;
-
+
+ private:
+ class TImpl;
+ friend class TEventLoop;
+
TObjectCounter<TChannel> ObjectCounter;
TChannel(TImpl*);
-
- private:
- THolder<TImpl> Impl;
- };
-
- typedef TIntrusivePtr<TChannel> TChannelPtr;
-
- class TEventLoop {
- public:
+
+ private:
+ THolder<TImpl> Impl;
+ };
+
+ typedef TIntrusivePtr<TChannel> TChannelPtr;
+
+ class TEventLoop {
+ public:
TEventLoop(const char* name = nullptr);
- ~TEventLoop();
-
- void Run();
- void Stop();
+ ~TEventLoop();
+
+ void Run();
+ void Stop();
bool IsRunning();
-
+
TChannelPtr Register(TSocket socket, TEventHandlerPtr, void* cookie = nullptr);
-
- private:
- class TImpl;
- friend class TChannel;
-
+
+ private:
+ class TImpl;
+ friend class TChannel;
+
TObjectCounter<TEventLoop> ObjectCounter;
- private:
- THolder<TImpl> Impl;
- };
-
-}
+ private:
+ THolder<TImpl> Impl;
+ };
+
+}
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 3474d62705..9176496252 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -56,7 +56,7 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e
}
TBusMessageQueue::~TBusMessageQueue() {
- Stop();
+ Stop();
}
void TBusMessageQueue::Stop() {
@@ -127,7 +127,7 @@ TString TBusMessageQueue::GetStatus(ui16 flags) const {
TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
Add(session.Get());
- return session.Get();
+ return session.Get();
}
TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
@@ -189,10 +189,10 @@ void TBusMessageQueue::DestroyAllSessions() {
}
}
-void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) {
- Scheduler.Schedule(i);
+void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) {
+ Scheduler.Schedule(i);
}
-
+
TString TBusMessageQueue::GetNameInternal() const {
return Config.Name;
}
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 24bd778799..8a64811979 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -79,19 +79,19 @@ namespace NBus {
TBusModuleImpl* const Module;
};
-
+
struct TModuleServerHandler
: public IBusServerHandler {
TModuleServerHandler(TBusModuleImpl* module)
: Module(module)
{
}
-
+
void OnMessage(TOnMessageContext& msg) override;
-
+
TBusModuleImpl* const Module;
};
-
+
struct TBusModuleImpl: public TBusModuleInternal {
TBusModule* const Module;
@@ -677,7 +677,7 @@ namespace NBus {
return true;
}
-
+
bool TBusModule::Shutdown() {
Impl->Shutdown();
@@ -702,7 +702,7 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
Impl->Queue = queue;
return true;
}
-
+
int TBusModule::GetModuleSessionInFlight() const {
return Impl->Size();
}
@@ -781,11 +781,11 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) {
if (jobCount == 0) {
ShutdownCondVar.BroadCast();
}
- }
+ }
}
job->JobStorageIterator = TList<TJobRunner*>::iterator();
-}
+}
void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index 8d1c4a5d52..1b75c4df46 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -407,4 +407,4 @@ namespace NBus {
TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config);
};
-}
+}
diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp
index 3bc421944f..7bd6e115c7 100644
--- a/library/cpp/messagebus/remote_client_session.cpp
+++ b/library/cpp/messagebus/remote_client_session.cpp
@@ -1,10 +1,10 @@
#include "remote_client_session.h"
-
+
#include "mb_lwtrace.h"
#include "remote_client_connection.h"
#include <library/cpp/messagebus/scheduler/scheduler.h>
-
+
#include <util/generic/cast.h>
#include <util/system/defaults.h>
@@ -19,9 +19,9 @@ TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue,
: TBusSessionImpl(true, queue, proto, handler, config, name)
, ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight")
, ClientHandler(handler)
-{
-}
-
+{
+}
+
TRemoteClientSession::~TRemoteClientSession() {
//Cerr << "~TRemoteClientSession" << Endl;
}
@@ -31,7 +31,7 @@ void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<
temp->swap(newMsg);
c->ReplyQueue.EnqueueAll(temp);
c->ScheduleWrite();
-}
+}
EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) {
if (Y_UNLIKELY(IsDown())) {
diff --git a/library/cpp/messagebus/remote_client_session.h b/library/cpp/messagebus/remote_client_session.h
index 7160d0dae9..f619d4d86a 100644
--- a/library/cpp/messagebus/remote_client_session.h
+++ b/library/cpp/messagebus/remote_client_session.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "remote_client_session_semaphore.h"
#include "session_impl.h"
@@ -14,7 +14,7 @@
namespace NBus {
namespace NPrivate {
using TRemoteClientSessionPtr = TIntrusivePtr<TRemoteClientSession>;
-
+
class TRemoteClientSession: public TBusClientSession, public TBusSessionImpl {
friend class TRemoteClientConnection;
friend class TInvokeOnReply;
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 22932569db..59b58f7797 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -7,9 +7,9 @@
#include "remote_client_session.h"
#include "remote_server_session.h"
#include "session_impl.h"
-
+
#include <library/cpp/messagebus/actor/what_thread_does.h>
-
+
#include <util/generic/cast.h>
#include <util/network/init.h>
#include <util/system/atomic.h>
@@ -44,7 +44,7 @@ namespace NBus {
WriterData.Status.ConnectionId = connectionId;
WriterData.Status.PeerAddr = PeerAddr;
ReaderData.Status.ConnectionId = connectionId;
-
+
const TInstant now = TInstant::Now();
WriterFillStatus();
@@ -70,7 +70,7 @@ namespace NBus {
Y_VERIFY(AtomicGet(Down));
Y_VERIFY(SendQueue.Empty());
}
-
+
bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
size_t left = Buffer.Size() - Offset;
@@ -137,7 +137,7 @@ namespace NBus {
void TRemoteConnection::Shutdown(EMessageStatus status) {
ScheduleShutdown(status);
-
+
ReaderData.ShutdownComplete.WaitI();
WriterData.ShutdownComplete.WaitI();
}
@@ -145,15 +145,15 @@ namespace NBus {
void TRemoteConnection::TryConnect() {
Y_FAIL("TryConnect is client connection only operation");
}
-
+
void TRemoteConnection::ScheduleRead() {
GetReaderActor()->Schedule();
}
-
+
void TRemoteConnection::ScheduleWrite() {
GetWriterActor()->Schedule();
}
-
+
void TRemoteConnection::WriterRotateCounters() {
if (!WriterData.TimeToRotateCounters.FetchTask()) {
return;
@@ -383,7 +383,7 @@ namespace NBus {
if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
ReaderData.Status.Incremental.BufferDrops += 1;
-
+
TBuffer temp;
// probably should use another constant
temp.Reserve(Config.DefaultBufferSize);
@@ -391,7 +391,7 @@ namespace NBus {
ReaderData.Buffer.Swap(temp);
}
-
+
return true;
}
@@ -406,7 +406,7 @@ namespace NBus {
ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
}
}
-
+
Y_ASSERT(ReaderData.Buffer.Avail() > 0);
ssize_t bytes;
@@ -465,27 +465,27 @@ namespace NBus {
if (!Session->IsSource_) {
message->SendTime = now.MilliSeconds();
}
-
+
WriterData.SendQueue.PushBack(message);
}
-
+
void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) {
BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now));
- }
-
+ }
+
void TRemoteConnection::WriterFillInFlight() {
// this is hack for TLoadBalancedProtocol
WriterFillStatus();
AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight());
}
-
+
const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
WriterRotateCounters();
WriterFillStatus();
return WriterData.Status;
}
-
+
void TRemoteConnection::WriterFillStatus() {
if (!!WriterData.Channel) {
WriterData.Status.Fd = WriterData.Channel->GetSocket();
@@ -644,11 +644,11 @@ namespace NBus {
if (WriterData.Buffer.Capacity() > MaxBufferSize) {
WriterData.Status.Incremental.BufferDrops += 1;
WriterData.Buffer.Reset();
- }
+ }
WriterData.State = WRITER_FILLING;
}
-
+
void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) {
if (Session->IsSource_) {
WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
@@ -662,11 +662,11 @@ namespace NBus {
AtomicSet(ReaderData.Down, 1);
ScheduleRead();
-
+
AtomicSet(WriterData.Down, 1);
ScheduleWrite();
}
-
+
void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const {
size_t posForAssertion = buffer.Size();
Proto->Serialize(msg, buffer);
@@ -688,12 +688,12 @@ namespace NBus {
}
}
-
+
void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
size_t pos = data->Size();
-
+
size_t dataSize;
-
+
bool compressionRequested = msg->IsCompressed();
if (compressionRequested) {
@@ -821,20 +821,20 @@ namespace NBus {
TBusMessagePtrAndHeader h(r);
r->RecvTime = now;
-
+
QuotaConsume(1, header.Size);
ReaderData.ReadMessages.push_back(h);
if (ReaderData.ReadMessages.size() >= 100) {
ReaderFlushMessages();
}
-
+
return true;
}
void TRemoteConnection::WriterFillBuffer() {
Y_ASSERT(WriterData.State == WRITER_FILLING);
-
+
Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
@@ -868,7 +868,7 @@ namespace NBus {
WriterData.CorkUntil = TInstant::Now() + Config.Cork;
}
}
-
+
size_t sizeBeforeSerialize = WriterData.Buffer.Size();
TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
@@ -952,7 +952,7 @@ namespace NBus {
void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
ResetOneWayFlag(ms);
-
+
WriterData.Status.Incremental.StatusCounter[status] += ms.size();
for (auto m : ms) {
Session->InvokeOnError(m, status);
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h
index 4538947368..ee0665774d 100644
--- a/library/cpp/messagebus/remote_connection.h
+++ b/library/cpp/messagebus/remote_connection.h
@@ -1,8 +1,8 @@
-#pragma once
-
+#pragma once
+
#include "async_result.h"
#include "defs.h"
-#include "event_loop.h"
+#include "event_loop.h"
#include "left_right_buffer.h"
#include "lfqueue_batch.h"
#include "message_ptr_and_header.h"
@@ -15,7 +15,7 @@
#include "ybus.h"
#include "misc/granup.h"
#include "misc/tokenquota.h"
-
+
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/executor.h>
#include <library/cpp/messagebus/actor/queue_for_actor.h>
@@ -96,7 +96,7 @@ namespace NBus {
void Shutdown(EMessageStatus status);
inline const TNetAddr& GetAddr() const noexcept;
-
+
private:
friend class TScheduleConnect;
friend class TWorkIO;
@@ -111,14 +111,14 @@ namespace NBus {
bool ReaderProcessBuffer();
bool ReaderFillBuffer();
void ReaderFlushMessages();
-
+
void ReadQuotaWakeup();
ui32 WriteWakeFlags() const;
-
+
virtual bool NeedInterruptRead() {
return false;
}
-
+
public:
virtual void TryConnect();
void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage);
@@ -174,7 +174,7 @@ namespace NBus {
void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status);
// takes ownership of ms
void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status);
-
+
void FireClientConnectionEvent(TClientConnectionEvent::EType);
size_t GetInFlight();
@@ -207,14 +207,14 @@ namespace NBus {
NEventLoop::TChannelPtr Channel;
ui32 SocketVersion;
-
+
TRemoteConnectionWriterStatus Status;
TInstant StatusLastSendTime;
-
+
TLocalTasks TimeToRotateCounters;
TAtomic InFlight;
-
+
TTimedMessages SendQueue;
ui32 AwakeFlags;
EWriterState State;
@@ -290,5 +290,5 @@ namespace NBus {
typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
- }
+ }
}
diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp
index 6abbf88a60..12765ab9b4 100644
--- a/library/cpp/messagebus/remote_server_session.cpp
+++ b/library/cpp/messagebus/remote_server_session.cpp
@@ -21,14 +21,14 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue,
: TBusSessionImpl(false, queue, proto, handler, config, name)
, ServerOwnedMessages(config.MaxInFlight, config.MaxInFlightBySize, "ServerOwnedMessages")
, ServerHandler(handler)
-{
+{
if (config.PerConnectionMaxInFlightBySize > 0) {
if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize)
ythrow yexception()
<< "too low PerConnectionMaxInFlightBySize value";
}
-}
-
+}
+
namespace NBus {
namespace NPrivate {
class TInvokeOnMessage: public IWorkItem {
@@ -83,7 +83,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<
JobCount.Add(workQueueTemp.GetVector()->size());
Queue->EnqueueWork(*workQueueTemp.GetVector());
}
-}
+}
void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) {
if (Y_UNLIKELY(AtomicGet(Down))) {
diff --git a/library/cpp/messagebus/remote_server_session.h b/library/cpp/messagebus/remote_server_session.h
index f5c266a7f7..c70dde00e2 100644
--- a/library/cpp/messagebus/remote_server_session.h
+++ b/library/cpp/messagebus/remote_server_session.h
@@ -1,8 +1,8 @@
-#pragma once
-
+#pragma once
+
#include "remote_server_session_semaphore.h"
#include "session_impl.h"
-
+
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance
@@ -12,7 +12,7 @@ namespace NBus {
namespace NPrivate {
class TRemoteServerSession: public TBusServerSession, public TBusSessionImpl {
friend class TRemoteServerConnection;
-
+
private:
TObjectCounter<TRemoteServerSession> ObjectCounter;
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp
index 5a5fe52894..5c0686d32a 100644
--- a/library/cpp/messagebus/scheduler/scheduler.cpp
+++ b/library/cpp/messagebus/scheduler/scheduler.cpp
@@ -1,37 +1,37 @@
-#include "scheduler.h"
-
-#include <util/datetime/base.h>
-#include <util/generic/algorithm.h>
-#include <util/generic/yexception.h>
-
+#include "scheduler.h"
+
+#include <util/datetime/base.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/yexception.h>
+
//#include "dummy_debugger.h"
using namespace NBus;
using namespace NBus::NPrivate;
-
-class TScheduleDeadlineCompare {
-public:
+
+class TScheduleDeadlineCompare {
+public:
bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept {
return i1->GetScheduleTime() > i2->GetScheduleTime();
- }
-};
-
-TScheduler::TScheduler()
+ }
+};
+
+TScheduler::TScheduler()
: StopThread(false)
, Thread([&] { this->SchedulerThread(); })
-{
-}
-
-TScheduler::~TScheduler() {
+{
+}
+
+TScheduler::~TScheduler() {
Y_VERIFY(StopThread, "state check");
-}
-
+}
+
size_t TScheduler::Size() const {
TGuard<TLock> guard(Lock);
return Items.size() + (!!NextItem ? 1 : 0);
}
-void TScheduler::Stop() {
+void TScheduler::Stop() {
{
TGuard<TLock> guard(Lock);
Y_VERIFY(!StopThread, "Scheduler already stopped");
@@ -46,28 +46,28 @@ void TScheduler::Stop() {
for (auto& item : Items) {
item.Destroy();
- }
-}
-
-void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) {
+ }
+}
+
+void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) {
TGuard<TLock> lock(Lock);
if (StopThread)
- return;
+ return;
if (!!NextItem) {
if (i->GetScheduleTime() < NextItem->GetScheduleTime()) {
DoSwap(i, NextItem);
}
- }
+ }
- Items.push_back(i);
+ Items.push_back(i);
PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
FillNextItem();
CondVar.Signal();
-}
-
+}
+
void TScheduler::FillNextItem() {
if (!NextItem && !Items.empty()) {
PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
@@ -76,22 +76,22 @@ void TScheduler::FillNextItem() {
}
}
-void TScheduler::SchedulerThread() {
+void TScheduler::SchedulerThread() {
for (;;) {
IScheduleItemAutoPtr current;
- {
+ {
TGuard<TLock> guard(Lock);
if (StopThread) {
break;
- }
+ }
if (!!NextItem) {
CondVar.WaitD(Lock, NextItem->GetScheduleTime());
} else {
CondVar.WaitI(Lock);
- }
+ }
if (StopThread) {
break;
@@ -106,7 +106,7 @@ void TScheduler::SchedulerThread() {
}
current = NextItem.Release();
- }
+ }
current->Do();
current.Destroy();
@@ -115,5 +115,5 @@ void TScheduler::SchedulerThread() {
TGuard<TLock> guard(Lock);
FillNextItem();
}
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h
index afcc0de55d..996bf30f8c 100644
--- a/library/cpp/messagebus/scheduler/scheduler.h
+++ b/library/cpp/messagebus/scheduler/scheduler.h
@@ -1,16 +1,16 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/threading/future/legacy_future.h>
#include <util/datetime/base.h>
#include <util/generic/object_counter.h>
-#include <util/generic/ptr.h>
-#include <util/generic/vector.h>
-#include <util/system/atomic.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/system/atomic.h>
#include <util/system/condvar.h>
#include <util/system/mutex.h>
-#include <util/system/thread.h>
-
+#include <util/system/thread.h>
+
namespace NBus {
namespace NPrivate {
class IScheduleItem {
@@ -25,30 +25,30 @@ namespace NBus {
private:
TInstant ScheduleTime;
};
-
+
using IScheduleItemAutoPtr = TAutoPtr<IScheduleItem>;
-
+
class TScheduler {
public:
TScheduler();
~TScheduler();
void Stop();
void Schedule(TAutoPtr<IScheduleItem> i);
-
+
size_t Size() const;
-
+
private:
void SchedulerThread();
-
+
void FillNextItem();
-
+
private:
TVector<IScheduleItemAutoPtr> Items;
IScheduleItemAutoPtr NextItem;
typedef TMutex TLock;
TLock Lock;
TCondVar CondVar;
-
+
TObjectCounter<TScheduler> ObjectCounter;
bool StopThread;
@@ -63,6 +63,6 @@ namespace NBus {
inline TInstant IScheduleItem::GetScheduleTime() const noexcept {
return ScheduleTime;
}
-
+
}
-}
+}
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index ddf9f360c4..76790221ec 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -14,7 +14,7 @@ using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NEventLoop;
-
+
namespace {
class TScheduleSession: public IScheduleItem {
public:
@@ -95,7 +95,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
: TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
, TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
, Impl(new TImpl)
- , IsSource_(isSource)
+ , IsSource_(isSource)
, Queue(queue)
, Proto(proto)
, ProtoName(Proto->GetService())
@@ -106,16 +106,16 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
, ReadEventLoop("rd-el")
, LastAcceptorId(0)
, LastConnectionId(0)
- , Down(0)
-{
+ , Down(0)
+{
Impl->DeadAcceptorStatusSummary.Summary = true;
ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop))));
WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
-}
-
+}
+
TBusSessionImpl::~TBusSessionImpl() {
Y_VERIFY(Down);
Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
@@ -160,11 +160,11 @@ void TBusSessionImpl::Shutdown() {
TGuard<TMutex> guard(ConnectionsLock);
Acceptors.clear();
}
-
+
for (auto& acceptor : acceptors) {
acceptor->Shutdown();
- }
-
+ }
+
// shutdown connections
TVector<TRemoteConnectionPtr> cs;
GetConnections(&cs);
@@ -189,12 +189,12 @@ void TBusSessionImpl::Shutdown() {
HandlerUseCountHolder.Reset();
ShutdownCompleteEvent.Signal();
-}
-
+}
+
bool TBusSessionImpl::IsDown() {
- return static_cast<bool>(AtomicGet(Down));
-}
-
+ return static_cast<bool>(AtomicGet(Down));
+}
+
size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
if (!!conn) {
@@ -202,8 +202,8 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
} else {
return 0;
}
-}
-
+}
+
void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
for (size_t i = 0; i < addrs.size(); ++i) {
@@ -427,7 +427,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
}
r.Config = Config;
-
+
TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
StatusData.StatusDumpCached = r;
}
@@ -490,7 +490,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
Y_ASSERT(q == Queue);
int actualPort = -1;
-
+
for (const TBindResult& br : bindTo) {
if (actualPort == -1) {
actualPort = br.Addr.GetPort();
@@ -502,14 +502,14 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu
}
TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
-
+
TConnectionsGuard guard(ConnectionsLock);
InsertAcceptorLockAcquired(acceptor.Get());
- }
+ }
Config.ListenPort = actualPort;
-}
-
+}
+
void TBusSessionImpl::SendSnapshotToStatusActor() {
//Y_ASSERT(ConnectionsLock.IsLocked());
@@ -604,24 +604,24 @@ void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message,
TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) {
TConnectionsGuard guard(ConnectionsLock);
-
- TAddrRemoteConnections::const_iterator it = Connections.find(addr);
- if (it != Connections.end()) {
- return it->second;
- }
-
- if (!create) {
- return TRemoteConnectionPtr();
- }
-
+
+ TAddrRemoteConnections::const_iterator it = Connections.find(addr);
+ if (it != Connections.end()) {
+ return it->second;
+ }
+
+ if (!create) {
+ return TRemoteConnectionPtr();
+ }
+
Y_VERIFY(IsSource_, "must be source");
TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
InsertConnectionLockAcquired(c.Get());
-
- return c;
-}
-
+
+ return c;
+}
+
void TBusSessionImpl::Cron() {
TVector<TRemoteConnectionPtr> connections;
GetConnections(&connections);
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h
index 90ef246ff8..d980ce6ce3 100644
--- a/library/cpp/messagebus/session_impl.h
+++ b/library/cpp/messagebus/session_impl.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "acceptor_status.h"
#include "async_result.h"
#include "event_loop.h"
@@ -9,7 +9,7 @@
#include "session_job_count.h"
#include "shutdown_state.h"
#include "ybus.h"
-
+
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/queue_in_actor.h>
#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
@@ -25,7 +25,7 @@ namespace NBus {
typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;
typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;
-
+
typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
typedef TVector<TAcceptorPtr> TAcceptorsPtrs;
@@ -34,7 +34,7 @@ namespace NBus {
TVector<TAcceptorPtr> Acceptors;
ui64 LastConnectionId;
ui64 LastAcceptorId;
-
+
TConnectionsAcceptorsSnapshot();
};
@@ -96,13 +96,13 @@ namespace NBus {
const TBusSessionConfig& config, const TString& name);
~TBusSessionImpl() override;
-
+
void Shutdown() override;
bool IsDown();
size_t GetInFlightImpl(const TNetAddr& addr) const;
size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;
-
+
void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
@@ -123,29 +123,29 @@ namespace NBus {
void StatusUpdateCachedDumpIfNecessary(TInstant now);
void Act(TStatusTag);
void Act(TConnectionTag);
-
+
TBusProtocol* GetProto() const noexcept override;
const TBusSessionConfig* GetConfig() const noexcept override;
TBusMessageQueue* GetQueue() const noexcept override;
TString GetNameInternal() override;
virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;
-
+
void Listen(int port, TBusMessageQueue* q);
void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q);
TBusConnection* Accept(SOCKET listen);
-
+
inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() {
return this;
}
inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() {
return this;
}
-
+
typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;
void SendSnapshotToStatusActor();
-
+
void InsertConnectionLockAcquired(TRemoteConnection* connection);
void InsertAcceptorLockAcquired(TAcceptor* acceptor);
@@ -159,7 +159,7 @@ namespace NBus {
TAcceptorPtr GetAcceptorById(ui64 id);
void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);
-
+
void Cron();
TBusSessionJobCount JobCount;
@@ -193,7 +193,7 @@ namespace NBus {
struct TStatusData {
TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;
-
+
TAtomicShutdownState ShutdownState;
TBusSessionStatus Status;
@@ -246,14 +246,14 @@ namespace NBus {
inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
return Proto;
}
-
+
inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept {
return &Config;
}
-
+
inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept {
return Queue;
}
-
- }
+
+ }
}
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp
index efefc87340..6743f0abe4 100644
--- a/library/cpp/messagebus/storage.cpp
+++ b/library/cpp/messagebus/storage.cpp
@@ -6,7 +6,7 @@ namespace NBus {
namespace NPrivate {
TTimedMessages::TTimedMessages() {
}
-
+
TTimedMessages::~TTimedMessages() {
Y_VERIFY(Items.empty());
}
@@ -33,14 +33,14 @@ namespace NBus {
size_t TTimedMessages::Size() const {
return Items.size();
}
-
+
void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
Clear(r);
return;
}
-
+
while (!Items.empty()) {
TItem& i = *Items.front();
if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
@@ -50,14 +50,14 @@ namespace NBus {
Items.pop_front();
}
}
-
+
void TTimedMessages::Clear(TMessagesPtrs* r) {
while (!Items.empty()) {
r->push_back(Items.front()->Message.Release());
Items.pop_front();
}
}
-
+
TSyncAckMessages::TSyncAckMessages() {
KeyToMessage.set_empty_key(0);
KeyToMessage.set_deleted_key(1);
@@ -66,14 +66,14 @@ namespace NBus {
TSyncAckMessages::~TSyncAckMessages() {
Y_VERIFY(KeyToMessage.empty());
Y_VERIFY(TimedItems.empty());
- }
-
+ }
+
void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
// Perform garbage collection if `TimedMessages` contain too many junk data
if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
Gc();
}
-
+
TValue value = {m.MessagePtr.Release()};
std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
@@ -95,7 +95,7 @@ namespace NBus {
return v.Message;
}
-
+
void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
@@ -110,7 +110,7 @@ namespace NBus {
if (TInstant::MilliSeconds(i.SendTime) > before) {
break;
}
-
+
TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
if (itMessage != KeyToMessage.end()) {
@@ -133,7 +133,7 @@ namespace NBus {
void TSyncAckMessages::Gc() {
TDeque<TTimedItem> tmp;
-
+
for (auto& timedItem : TimedItems) {
if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
continue;
@@ -143,7 +143,7 @@ namespace NBus {
TimedItems.swap(tmp);
}
-
+
void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
for (auto message : messages) {
TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
@@ -158,4 +158,4 @@ namespace NBus {
}
}
-}
+}
diff --git a/library/cpp/messagebus/storage.h b/library/cpp/messagebus/storage.h
index 7d168844ed..3f8de480a1 100644
--- a/library/cpp/messagebus/storage.h
+++ b/library/cpp/messagebus/storage.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "message_ptr_and_header.h"
#include "moved.h"
#include "ybus.h"
@@ -18,7 +18,7 @@ namespace NBus {
public:
TTimedMessages();
~TTimedMessages();
-
+
struct TItem {
THolder<TBusMessage> Message;
@@ -36,31 +36,31 @@ namespace NBus {
void Timeout(TInstant before, TMessagesPtrs* r);
void Clear(TMessagesPtrs* r);
-
+
private:
TItems Items;
};
-
+
class TSyncAckMessages : TNonCopyable {
public:
TSyncAckMessages();
~TSyncAckMessages();
-
+
void Push(TBusMessagePtrAndHeader& m);
TBusMessage* Pop(TBusKey id);
void Timeout(TInstant before, TMessagesPtrs* r);
void Clear(TMessagesPtrs* r);
-
+
size_t Size() const {
return KeyToMessage.size();
}
-
+
void RemoveAll(const TMessagesPtrs&);
-
+
void Gc();
-
+
void DumpState();
private:
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index 8489319278..e4ea37f0f3 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -605,10 +605,10 @@ int main(int argc, char* argv[]) {
opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
- opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
- opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
- opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
- opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
+ opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
+ opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
+ opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
+ opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
.RequiredArgument("BOOL")
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..a358339513 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -1,5 +1,5 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/fixed_port.h>
#include <library/cpp/messagebus/test/helper/hanging_server.h>
@@ -13,7 +13,7 @@
#include <utility>
using namespace NBus;
-using namespace NBus::NTest;
+using namespace NBus::NTest;
namespace {
struct TExampleClientSlowOnMessageSent: public TExampleClient {
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h
index de21ad8521..bb21cf1c99 100644
--- a/library/cpp/messagebus/ybus.h
+++ b/library/cpp/messagebus/ybus.h
@@ -193,7 +193,7 @@ namespace NBus {
void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session);
void Remove(TBusSession* session);
};
-
+
/////////////////////////////////////////////////////////////////
/// Factory methods to construct message queue
TBusMessageQueuePtr CreateMessageQueue(const char* name = "");
diff --git a/library/cpp/monlib/service/service.h b/library/cpp/monlib/service/service.h
index 2f66dddaf8..ccf09fefab 100644
--- a/library/cpp/monlib/service/service.h
+++ b/library/cpp/monlib/service/service.h
@@ -5,7 +5,7 @@
#include <library/cpp/http/fetch/httpheader.h>
#include <library/cpp/http/server/http.h>
#include <library/cpp/logger/all.h>
-
+
#include <util/network/ip.h>
#include <library/cpp/cgiparam/cgiparam.h>
diff --git a/library/cpp/packedtypes/packed.h b/library/cpp/packedtypes/packed.h
index 88cff26ae2..a7386b7828 100644
--- a/library/cpp/packedtypes/packed.h
+++ b/library/cpp/packedtypes/packed.h
@@ -5,8 +5,8 @@
#include <util/stream/output.h>
#include <util/ysaveload.h>
-#include "longs.h"
-
+#include "longs.h"
+
struct Stream_traits {
template <typename T>
static T get(IInputStream& in) {
diff --git a/library/cpp/packedtypes/ya.make b/library/cpp/packedtypes/ya.make
index 4c2c950619..8af6d0eab6 100644
--- a/library/cpp/packedtypes/ya.make
+++ b/library/cpp/packedtypes/ya.make
@@ -1,21 +1,21 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
akhropov
velavokr
)
-PEERDIR(
+PEERDIR(
library/cpp/streams/zc_memory_input
-)
-
-SRCS(
- fixed_point.h
+)
+
+SRCS(
+ fixed_point.h
longs.cpp
- packed.h
- packedfloat.cpp
- packedfloat.h
+ packed.h
+ packedfloat.cpp
+ packedfloat.h
zigzag.h
-)
-
-END()
+)
+
+END()
diff --git a/library/cpp/sighandler/ya.make b/library/cpp/sighandler/ya.make
index c0f7ea6084..3fd5244787 100644
--- a/library/cpp/sighandler/ya.make
+++ b/library/cpp/sighandler/ya.make
@@ -1,10 +1,10 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(pg)
-
-SRCS(
- async_signals_handler.cpp
- async_signals_handler.h
-)
-
-END()
+
+SRCS(
+ async_signals_handler.cpp
+ async_signals_handler.h
+)
+
+END()
diff --git a/library/cpp/string_utils/url/url.cpp b/library/cpp/string_utils/url/url.cpp
index 85f4ac5d69..988122e5d1 100644
--- a/library/cpp/string_utils/url/url.cpp
+++ b/library/cpp/string_utils/url/url.cpp
@@ -312,28 +312,28 @@ TStringBuf CutMPrefix(const TStringBuf url) noexcept {
static inline bool IsSchemeChar(char c) noexcept {
return IsAsciiAlnum(c); //what about '+' ?..
-}
-
+}
+
static bool HasPrefix(const TStringBuf url) noexcept {
TStringBuf scheme, unused;
if (!url.TrySplit(TStringBuf("://"), scheme, unused))
- return false;
-
+ return false;
+
return AllOf(scheme, IsSchemeChar);
-}
-
+}
+
TString AddSchemePrefix(const TString& url) {
return AddSchemePrefix(url, TStringBuf("http"));
}
TString AddSchemePrefix(const TString& url, TStringBuf scheme) {
- if (HasPrefix(url)) {
- return url;
- }
-
+ if (HasPrefix(url)) {
+ return url;
+ }
+
return TString::Join(scheme, TStringBuf("://"), url);
-}
-
+}
+
#define X(c) (c >= 'A' ? ((c & 0xdf) - 'A') + 10 : (c - '0'))
static inline int x2c(unsigned char* x) {
diff --git a/library/cpp/string_utils/url/url.h b/library/cpp/string_utils/url/url.h
index 84137ccc57..c5c550ea6b 100644
--- a/library/cpp/string_utils/url/url.h
+++ b/library/cpp/string_utils/url/url.h
@@ -56,10 +56,10 @@ TWtringBuf CutHttpPrefix(const TWtringBuf url, bool ignorehttps = false) noexcep
Y_PURE_FUNCTION
TStringBuf CutSchemePrefix(const TStringBuf url) noexcept;
-//! adds specified scheme prefix if URL has no scheme
-//! @note if URL has scheme prefix already the function returns unchanged URL
+//! adds specified scheme prefix if URL has no scheme
+//! @note if URL has scheme prefix already the function returns unchanged URL
TString AddSchemePrefix(const TString& url, const TStringBuf scheme);
-
+
//! Same as `AddSchemePrefix(url, "http")`.
TString AddSchemePrefix(const TString& url);
diff --git a/library/cpp/string_utils/url/url_ut.cpp b/library/cpp/string_utils/url/url_ut.cpp
index 1588013893..02a0de9e5f 100644
--- a/library/cpp/string_utils/url/url_ut.cpp
+++ b/library/cpp/string_utils/url/url_ut.cpp
@@ -69,14 +69,14 @@ Y_UNIT_TEST_SUITE(TUtilUrlTest) {
UNIT_ASSERT_VALUES_EQUAL("FHFBN", GetZone("ya.FHFBN"));
UNIT_ASSERT_VALUES_EQUAL("", GetZone(""));
}
-
+
Y_UNIT_TEST(TestAddSchemePrefix) {
- UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("yandex.ru"));
- UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("http://yandex.ru"));
- UNIT_ASSERT_VALUES_EQUAL("https://yandex.ru", AddSchemePrefix("https://yandex.ru"));
- UNIT_ASSERT_VALUES_EQUAL("file://yandex.ru", AddSchemePrefix("file://yandex.ru"));
- UNIT_ASSERT_VALUES_EQUAL("ftp://ya.ru", AddSchemePrefix("ya.ru", "ftp"));
- }
+ UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("yandex.ru"));
+ UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("http://yandex.ru"));
+ UNIT_ASSERT_VALUES_EQUAL("https://yandex.ru", AddSchemePrefix("https://yandex.ru"));
+ UNIT_ASSERT_VALUES_EQUAL("file://yandex.ru", AddSchemePrefix("file://yandex.ru"));
+ UNIT_ASSERT_VALUES_EQUAL("ftp://ya.ru", AddSchemePrefix("ya.ru", "ftp"));
+ }
Y_UNIT_TEST(TestSchemeGet) {
UNIT_ASSERT_VALUES_EQUAL("http://", GetSchemePrefix("http://ya.ru/bebe"));
diff --git a/library/cpp/testing/unittest/utmain.cpp b/library/cpp/testing/unittest/utmain.cpp
index 305bc6b40f..a037e84c55 100644
--- a/library/cpp/testing/unittest/utmain.cpp
+++ b/library/cpp/testing/unittest/utmain.cpp
@@ -742,13 +742,13 @@ int NUnitTest::RunMain(int argc, char** argv) {
processor.Enable(name);
}
}
- }
+ }
if (listTests != DONT_LIST) {
return DoList(listTests == LIST_VERBOSE, *listStream);
}
TTestFactory::Instance().SetProcessor(&processor);
-
+
unsigned ret;
for (;;) {
ret = TTestFactory::Instance().Execute();
diff --git a/library/cpp/threading/future/async.h b/library/cpp/threading/future/async.h
index 8543fdd5c6..0f3a741372 100644
--- a/library/cpp/threading/future/async.h
+++ b/library/cpp/threading/future/async.h
@@ -20,10 +20,10 @@ namespace NThreading {
template <typename Func>
TFuture<TFutureType<TFunctionResult<Func>>> Async(Func&& func, IThreadPool& queue) {
auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>();
- auto lambda = [promise, func = std::forward<Func>(func)]() mutable {
+ auto lambda = [promise, func = std::forward<Func>(func)]() mutable {
NImpl::SetValue(promise, func);
- };
- queue.SafeAddFunc(std::move(lambda));
+ };
+ queue.SafeAddFunc(std::move(lambda));
return promise.GetFuture();
}
diff --git a/library/cpp/unicode/punycode/punycode_ut.cpp b/library/cpp/unicode/punycode/punycode_ut.cpp
index 97271cf0d8..6724d828c8 100644
--- a/library/cpp/unicode/punycode/punycode_ut.cpp
+++ b/library/cpp/unicode/punycode/punycode_ut.cpp
@@ -3,19 +3,19 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/charset/wide.h>
-namespace {
- template<typename T1, typename T2>
- inline bool HasSameBuffer(const T1& s1, const T2& s2) {
- return s1.begin() == s2.begin();
- }
-}
-
+namespace {
+ template<typename T1, typename T2>
+ inline bool HasSameBuffer(const T1& s1, const T2& s2) {
+ return s1.begin() == s2.begin();
+ }
+}
+
Y_UNIT_TEST_SUITE(TPunycodeTest) {
static bool TestRaw(const TString& utf8, const TString& punycode) {
TUtf16String unicode = UTF8ToWide(utf8);
TString buf1;
TUtf16String buf2;
- return HasSameBuffer(WideToPunycode(unicode, buf1), buf1) && buf1 == punycode && HasSameBuffer(PunycodeToWide(punycode, buf2), buf2) && buf2 == unicode && WideToPunycode(unicode) == punycode && PunycodeToWide(punycode) == unicode;
+ return HasSameBuffer(WideToPunycode(unicode, buf1), buf1) && buf1 == punycode && HasSameBuffer(PunycodeToWide(punycode, buf2), buf2) && buf2 == unicode && WideToPunycode(unicode) == punycode && PunycodeToWide(punycode) == unicode;
}
Y_UNIT_TEST(RawEncodeDecode) {
diff --git a/library/cpp/yson_pull/scalar.h b/library/cpp/yson_pull/scalar.h
index 509fce8b5e..e1ea5d41cb 100644
--- a/library/cpp/yson_pull/scalar.h
+++ b/library/cpp/yson_pull/scalar.h
@@ -26,7 +26,7 @@ namespace NYsonPull {
size_t Size;
};
- ui8 AsNothing[1];
+ ui8 AsNothing[1];
bool AsBoolean;
i64 AsInt64;
ui64 AsUInt64;
diff --git a/library/python/symbols/libc/ya.make b/library/python/symbols/libc/ya.make
index 7b84cbc961..e20d063505 100644
--- a/library/python/symbols/libc/ya.make
+++ b/library/python/symbols/libc/ya.make
@@ -6,7 +6,7 @@ PEERDIR(
library/python/symbols/registry
)
-IF (GCC OR CLANG)
+IF (GCC OR CLANG)
CFLAGS(
-Wno-deprecated-declarations # For sem_getvalue.
)