diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:49 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:49 +0300 |
commit | 7489e4682331202b9c7d863c0898eb83d7b12c2b (patch) | |
tree | 9142afc54d335ea52910662635b898e79e192e49 /library | |
parent | a5950576e397b1909261050b8c7da16db58f10b1 (diff) | |
download | ydb-7489e4682331202b9c7d863c0898eb83d7b12c2b.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
51 files changed, 786 insertions, 786 deletions
diff --git a/library/cpp/archive/ya.make b/library/cpp/archive/ya.make index 067daad0dd..65d36479ef 100644 --- a/library/cpp/archive/ya.make +++ b/library/cpp/archive/ya.make @@ -1,12 +1,12 @@ -LIBRARY() - -OWNER(pg) - -SRCS( - yarchive.cpp - yarchive.h +LIBRARY() + +OWNER(pg) + +SRCS( + yarchive.cpp + yarchive.h directory_models_archive_reader.cpp directory_models_archive_reader.h -) - -END() +) + +END() diff --git a/library/cpp/blockcodecs/core/stream.cpp b/library/cpp/blockcodecs/core/stream.cpp index c4b4865245..4f7db3c32b 100644 --- a/library/cpp/blockcodecs/core/stream.cpp +++ b/library/cpp/blockcodecs/core/stream.cpp @@ -186,10 +186,10 @@ size_t TDecodedInput::DoUnboundedNext(const void** ptr) { return 0; } - if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) { + if (Y_UNLIKELY(blockLen > 1024 * 1024 * 1024)) { ythrow yexception() << "block size exceeds 1 GiB"; - } - + } + TBuffer block; block.Resize(blockLen); diff --git a/library/cpp/cache/ya.make b/library/cpp/cache/ya.make index 85f6127060..fd73032bf8 100644 --- a/library/cpp/cache/ya.make +++ b/library/cpp/cache/ya.make @@ -1,15 +1,15 @@ -LIBRARY() - +LIBRARY() + OWNER( g:util ) - -SRCS( - cache.cpp + +SRCS( + cache.cpp thread_safe_cache.cpp -) - -END() +) + +END() RECURSE_FOR_TESTS( ut diff --git a/library/cpp/cgiparam/cgiparam.h b/library/cpp/cgiparam/cgiparam.h index 1da4452adf..87d1ab0ad4 100644 --- a/library/cpp/cgiparam/cgiparam.h +++ b/library/cpp/cgiparam/cgiparam.h @@ -87,21 +87,21 @@ public: void InsertEscaped(const TStringBuf name, const TStringBuf value); -#if !defined(__GLIBCXX__) +#if !defined(__GLIBCXX__) template <typename TName, typename TValue> inline void InsertUnescaped(TName&& name, TValue&& value) { - // TStringBuf use as TName or TValue is C++17 actually. - // There is no pair constructor available in C++14 when required type - // is not implicitly constructible from given type. - // But libc++ pair allows this with C++14. + // TStringBuf use as TName or TValue is C++17 actually. + // There is no pair constructor available in C++14 when required type + // is not implicitly constructible from given type. + // But libc++ pair allows this with C++14. emplace(std::forward<TName>(name), std::forward<TValue>(value)); } -#else - template <typename TName, typename TValue> - inline void InsertUnescaped(TName&& name, TValue&& value) { - emplace(TString(name), TString(value)); - } -#endif +#else + template <typename TName, typename TValue> + inline void InsertUnescaped(TName&& name, TValue&& value) { + emplace(TString(name), TString(value)); + } +#endif // replace all values for a given key with new values template <typename TIter> diff --git a/library/cpp/comptable/usage/usage.cpp b/library/cpp/comptable/usage/usage.cpp index b15d9c891d..9997c83686 100644 --- a/library/cpp/comptable/usage/usage.cpp +++ b/library/cpp/comptable/usage/usage.cpp @@ -3,9 +3,9 @@ #include <util/random/random.h> #include <util/random/fast.h> -#include <time.h> -#include <stdlib.h> - +#include <time.h> +#include <stdlib.h> + using namespace NCompTable; template <bool HQ> diff --git a/library/cpp/containers/comptrie/comptrie_ut.cpp b/library/cpp/containers/comptrie/comptrie_ut.cpp index a5b2ea248a..74bee09b5d 100644 --- a/library/cpp/containers/comptrie/comptrie_ut.cpp +++ b/library/cpp/containers/comptrie/comptrie_ut.cpp @@ -862,14 +862,14 @@ void TCompactTrieTest::TestMergeFromFile() { { TCompactTrieBuilder<> b; - UNIT_ASSERT(b.AddSubtreeInFile("com.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-com")); + UNIT_ASSERT(b.AddSubtreeInFile("com.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-com")); UNIT_ASSERT(b.Add("org.kernel", 22)); - UNIT_ASSERT(b.AddSubtreeInFile("ru.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-ru")); + UNIT_ASSERT(b.AddSubtreeInFile("ru.", GetSystemTempDir() + "/TCompactTrieTest-TestMerge-ru")); TUnbufferedFileOutput out(GetSystemTempDir() + "/TCompactTrieTest-TestMerge-res"); b.Save(out); } - TCompactTrie<> trie(TBlob::FromFileSingleThreaded(GetSystemTempDir() + "/TCompactTrieTest-TestMerge-res")); + TCompactTrie<> trie(TBlob::FromFileSingleThreaded(GetSystemTempDir() + "/TCompactTrieTest-TestMerge-res")); UNIT_ASSERT_VALUES_EQUAL(12u, trie.Get("ru.yandex")); UNIT_ASSERT_VALUES_EQUAL(13u, trie.Get("ru.google")); UNIT_ASSERT_VALUES_EQUAL(14u, trie.Get("ru.mail")); diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp index 82809e63de..8b372496a2 100644 --- a/library/cpp/coroutine/engine/coroutine_ut.cpp +++ b/library/cpp/coroutine/engine/coroutine_ut.cpp @@ -509,7 +509,7 @@ namespace NCoroWaitWakeLivelockBug { struct TSubState { TSubState(TState& parent, ui32 self) : Parent(parent) - , Name(TStringBuilder() << "Sub" << self) + , Name(TStringBuilder() << "Sub" << self) , Self(self) { UNIT_ASSERT(self < 2); diff --git a/library/cpp/dbg_output/dump.h b/library/cpp/dbg_output/dump.h index 99da7e5802..c7efa105ee 100644 --- a/library/cpp/dbg_output/dump.h +++ b/library/cpp/dbg_output/dump.h @@ -97,10 +97,10 @@ namespace NPrivate { template <class T, class TColorScheme = DBG_OUTPUT_DEFAULT_COLOR_SCHEME> static inline ::NPrivate::TDbgDump<T, ::NPrivate::TTraitsShallow<TColorScheme>> DbgDump(const T& t) { - return {std::addressof(t)}; + return {std::addressof(t)}; } template <class T, class TColorScheme = DBG_OUTPUT_DEFAULT_COLOR_SCHEME> static inline ::NPrivate::TDbgDump<T, ::NPrivate::TTraitsDeep<TColorScheme>> DbgDumpDeep(const T& t) { - return {std::addressof(t)}; + return {std::addressof(t)}; } diff --git a/library/cpp/deprecated/mapped_file/mapped_file.cpp b/library/cpp/deprecated/mapped_file/mapped_file.cpp index 02dbe5d62e..b0e4511299 100644 --- a/library/cpp/deprecated/mapped_file/mapped_file.cpp +++ b/library/cpp/deprecated/mapped_file/mapped_file.cpp @@ -10,16 +10,16 @@ TMappedFile::TMappedFile(TFileMap* map, const char* dbgName) { i64 len = Map_->Length(); if (Hi32(len) != 0 && sizeof(size_t) <= sizeof(ui32)) ythrow yexception() << "File '" << dbgName << "' mapping error: " << len << " too large"; - + Map_->Map(0, static_cast<size_t>(len)); -} - +} + TMappedFile::TMappedFile(const TFile& file, TFileMap::EOpenMode om, const char* dbgName) : Map_(nullptr) -{ +{ init(file, om, dbgName); -} - +} + void TMappedFile::precharge(size_t off, size_t size) const { if (!Map_) return; @@ -39,24 +39,24 @@ void TMappedFile::init(const TString& name, size_t length, TFileMap::EOpenMode o THolder<TFileMap> map(new TFileMap(name, length, om)); TMappedFile newFile(map.Get(), name.data()); Y_UNUSED(map.Release()); - newFile.swap(*this); - newFile.term(); -} + newFile.swap(*this); + newFile.term(); +} void TMappedFile::init(const TFile& file, TFileMap::EOpenMode om, const char* dbgName) { THolder<TFileMap> map(new TFileMap(file, om)); TMappedFile newFile(map.Get(), dbgName); Y_UNUSED(map.Release()); - newFile.swap(*this); - newFile.term(); + newFile.swap(*this); + newFile.term(); } void TMappedFile::init(const TString& name, TFileMap::EOpenMode om) { THolder<TFileMap> map(new TFileMap(name, om)); TMappedFile newFile(map.Get(), name.data()); Y_UNUSED(map.Release()); - newFile.swap(*this); - newFile.term(); + newFile.swap(*this); + newFile.term(); } void TMappedFile::flush() { diff --git a/library/cpp/deprecated/mapped_file/mapped_file.h b/library/cpp/deprecated/mapped_file/mapped_file.h index 93fd9cf000..45859ed65a 100644 --- a/library/cpp/deprecated/mapped_file/mapped_file.h +++ b/library/cpp/deprecated/mapped_file/mapped_file.h @@ -18,9 +18,9 @@ class TMappedFile { private: TFileMap* Map_; -private: +private: TMappedFile(TFileMap* map, const char* dbgName); - + public: TMappedFile() { Map_ = nullptr; @@ -36,11 +36,11 @@ public: } TMappedFile(const TFile& file, TFileMap::EOpenMode om = TFileMap::oRdOnly, const char* dbgName = "unknown"); - + void init(const TString& name); void init(const TString& name, TFileMap::EOpenMode om); - + void init(const TString& name, size_t length, TFileMap::EOpenMode om); void init(const TFile&, TFileMap::EOpenMode om = TFileMap::oRdOnly, const char* dbgName = "unknown"); @@ -65,8 +65,8 @@ public: } void precharge(size_t pos = 0, size_t size = (size_t)-1) const; - + void swap(TMappedFile& file) noexcept { DoSwap(Map_, file.Map_); - } + } }; diff --git a/library/cpp/digest/old_crc/crc.h b/library/cpp/digest/old_crc/crc.h index 0b99a45a7d..4a3ce6d05e 100644 --- a/library/cpp/digest/old_crc/crc.h +++ b/library/cpp/digest/old_crc/crc.h @@ -79,11 +79,11 @@ namespace NCrcPrivate { #undef DEF_CRC_FUNC } -template <class T> +template <class T> static inline T Crc(const void* buf, size_t len, T init) { return (T)NCrcPrivate::TCrcHelper<8 * sizeof(T)>::Crc(buf, len, init); -} - +} + template <class T> static inline T Crc(const void* buf, size_t len) { return Crc<T>(buf, len, (T)NCrcPrivate::TCrcHelper<8 * sizeof(T)>::Init); diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index fbd127a652..128583bdd7 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -1,4 +1,4 @@ -#include "http.h" +#include "http.h" #include "http_ex.h" #include <library/cpp/threading/equeue/equeue.h> @@ -7,25 +7,25 @@ #include <util/generic/cast.h> #include <util/generic/intrlist.h> #include <util/generic/yexception.h> -#include <util/network/address.h> +#include <util/network/address.h> #include <util/network/socket.h> -#include <util/network/poller.h> -#include <util/system/atomic.h> -#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy -#include <util/system/defaults.h> -#include <util/system/event.h> -#include <util/system/mutex.h> +#include <util/network/poller.h> +#include <util/system/atomic.h> +#include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy +#include <util/system/defaults.h> +#include <util/system/event.h> +#include <util/system/mutex.h> #include <util/system/pipe.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/thread/factory.h> -#include <cerrno> -#include <cstring> -#include <ctime> +#include <cerrno> +#include <cstring> +#include <ctime> + +#include <sys/stat.h> +#include <sys/types.h> -#include <sys/stat.h> -#include <sys/types.h> - using namespace NAddr; namespace { @@ -51,18 +51,18 @@ namespace { } class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> { -public: +public: TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef); ~TClientConnection() override; void OnPollEvent(TInstant now) override; inline void Activate(TInstant now) noexcept; - inline void DeActivate(); + inline void DeActivate(); inline void Reject(); -public: - TSocket Socket_; +public: + TSocket Socket_; NAddr::IRemoteAddrRef ListenerSockAddrRef_; THttpServer::TImpl* HttpServ_ = nullptr; bool Reject_ = false; @@ -72,70 +72,70 @@ public: }; class THttpServer::TImpl { -public: - class TConnections { +public: + class TConnections { public: inline TConnections(TSocketPoller* poller, const THttpServerOptions& options) - : Poller_(poller) + : Poller_(poller) , Options(options) - { - } + { + } inline ~TConnections() { - } + } inline void Add(TClientConnection* c) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); - Conns_.PushBack(c); - Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); - } + Conns_.PushBack(c); + Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c)); + } inline void Erase(TClientConnection* c, TInstant now) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); EraseUnsafe(c); if (Options.ExpirationTimeout > TDuration::Zero()) { TryRemovingUnsafe(now - Options.ExpirationTimeout); } - } + } inline void Clear() noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); - Conns_.Clear(); - } + Conns_.Clear(); + } inline bool RemoveOld(TInstant border) noexcept { - TGuard<TMutex> g(Mutex_); + TGuard<TMutex> g(Mutex_); return TryRemovingUnsafe(border); } bool TryRemovingUnsafe(TInstant border) noexcept { - if (Conns_.Empty()) { + if (Conns_.Empty()) { return false; - } - TClientConnection* c = &*(Conns_.Begin()); + } + TClientConnection* c = &*(Conns_.Begin()); if (c->LastUsed > border) { return false; } EraseUnsafe(c); - delete c; + delete c; return true; - } + } void EraseUnsafe(TClientConnection* c) noexcept { Poller_->Unwait(c->Socket_); c->Unlink(); } - public: - TMutex Mutex_; - TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; + public: + TMutex Mutex_; + TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_; TSocketPoller* Poller_ = nullptr; const THttpServerOptions& Options; - }; + }; - static void* ListenSocketFunction(void* param) { + static void* ListenSocketFunction(void* param) { try { ((TImpl*)param)->ListenSocket(); } catch (...) { @@ -143,19 +143,19 @@ public: } return nullptr; - } + } - TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { - THolder<TClientRequest> obj(Cb_->CreateClient()); + TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) { + THolder<TClientRequest> obj(Cb_->CreateClient()); - obj->Conn_.Reset(c.Release()); + obj->Conn_.Reset(c.Release()); + + return obj; + } - return obj; - } - void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) { if (MaxRequestsReached()) { - Cb_->OnMaxConn(); + Cb_->OnMaxConn(); bool wasRemoved = Connections->RemoveOld(TInstant::Max()); if (!wasRemoved && Options_.RejectExcessConnections) { (new TClientConnection(s, this, listenerSockAddrRef))->Reject(); @@ -166,27 +166,27 @@ public: auto connection = new TClientConnection(s, this, listenerSockAddrRef); connection->LastUsed = now; connection->DeActivate(); - } + } - void SaveErrorCode() { + void SaveErrorCode() { ErrorCode = WSAGetLastError(); - } + } int GetErrorCode() const { return ErrorCode; - } + } const char* GetError() const { return LastSystemErrorText(ErrorCode); - } + } - bool Start() { - Poller.Reset(new TSocketPoller()); + bool Start() { + Poller.Reset(new TSocketPoller()); Connections.Reset(new TConnections(Poller.Get(), Options_)); - + // Start the listener thread - ListenerRunningOK = false; - + ListenerRunningOK = false; + // throws on error TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd); @@ -195,31 +195,31 @@ public: Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble); - ListenStartEvent.Reset(); - try { + ListenStartEvent.Reset(); + try { ListenThread.Reset(new TThread(ListenSocketFunction, this)); ListenThread->Start(); - } catch (const yexception&) { - SaveErrorCode(); - return false; + } catch (const yexception&) { + SaveErrorCode(); + return false; } // Wait until the thread has completely started and return the success indicator - ListenStartEvent.Wait(); + ListenStartEvent.Wait(); - return ListenerRunningOK; - } + return ListenerRunningOK; + } - void Wait() { - Cb_->OnWait(); + void Wait() { + Cb_->OnWait(); TGuard<TMutex> g(StopMutex); if (ListenThread) { ListenThread->Join(); ListenThread.Reset(nullptr); } - } + } - void Stop() { + void Stop() { Shutdown(); TGuard<TMutex> g(StopMutex); @@ -228,19 +228,19 @@ public: ListenThread.Reset(nullptr); } - while (ConnectionCount) { - usleep(10000); - Connections->Clear(); + while (ConnectionCount) { + usleep(10000); + Connections->Clear(); } - Connections.Destroy(); - Poller.Destroy(); - } + Connections.Destroy(); + Poller.Destroy(); + } - void Shutdown() { + void Shutdown() { ListenWakeupWriteFd.Write("", 1); // ignore result - } + } void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { struct TFailRequest: public THttpClientRequestEx { @@ -257,20 +257,20 @@ public: ProcessFailRequest(0); return true; } - }; + }; if (!fail && Requests->Add(req.Get())) { Y_UNUSED(req.Release()); - } else { - req = new TFailRequest(req); + } else { + req = new TFailRequest(req); - if (FailRequests->Add(req.Get())) { + if (FailRequests->Add(req.Get())) { Y_UNUSED(req.Release()); } else { - Cb_->OnFailRequest(-1); + Cb_->OnFailRequest(-1); } } - } + } size_t GetRequestQueueSize() const { return Requests->Size(); @@ -305,8 +305,8 @@ public: if (s == INVALID_SOCKET) { ythrow yexception() << "accept: " << LastSystemErrorText(); - } - + } + Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_); } @@ -318,13 +318,13 @@ public: TSocket S_; TImpl* Server_ = nullptr; NAddr::IRemoteAddrRef SockAddrRef_; - }; + }; - void ListenSocket() { + void ListenSocket() { TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str()); ErrorCode = 0; - TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; + TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs; std::function<void(TSocket)> callback = [&](TSocket socket) { THolder<TListenSocket> ls(new TListenSocket(socket, this)); @@ -337,65 +337,65 @@ public: ListenStartEvent.Signal(); return; - } + } - Requests->Start(Options_.nThreads, Options_.MaxQueueSize); - FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); - Cb_->OnListenStart(); - ListenerRunningOK = true; - ListenStartEvent.Signal(); + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize); + Cb_->OnListenStart(); + ListenerRunningOK = true; + ListenStartEvent.Signal(); TVector<void*> events; - events.resize(1); + events.resize(1); TInstant now = TInstant::Now(); for (;;) { - try { + try { const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout; const size_t ret = Poller->WaitD(events.data(), events.size(), deadline); now = TInstant::Now(); - for (size_t i = 0; i < ret; ++i) { + for (size_t i = 0; i < ret; ++i) { ((IPollAble*)events[i])->OnPollEvent(now); - } + } if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) { Connections->RemoveOld(now - Options_.ExpirationTimeout); } // When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call - // RemoveOld and destroy other IPollAble* objects in the - // poller. Thus in this case we can safely process only one - // event from the poller at a time. + // RemoveOld and destroy other IPollAble* objects in the + // poller. Thus in this case we can safely process only one + // event from the poller at a time. if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) { if (ret >= events.size()) { - events.resize(ret * 2); + events.resize(ret * 2); } } } catch (const TShouldStop&) { break; - } catch (...) { - Cb_->OnException(); + } catch (...) { + Cb_->OnException(); } } - while (!Reqs.Empty()) { - THolder<TListenSocket> ls(Reqs.PopFront()); + while (!Reqs.Empty()) { + THolder<TListenSocket> ls(Reqs.PopFront()); - Poller->Unwait(ls->GetSocket()); + Poller->Unwait(ls->GetSocket()); } - Requests->Stop(); - FailRequests->Stop(); - Cb_->OnListenStop(); - } + Requests->Stop(); + FailRequests->Stop(); + Cb_->OnListenStop(); + } - void RestartRequestThreads(ui32 nTh, ui32 maxQS) { - Requests->Stop(); - Options_.nThreads = nTh; - Options_.MaxQueueSize = maxQS; - Requests->Start(Options_.nThreads, Options_.MaxQueueSize); - } + void RestartRequestThreads(ui32 nTh, ui32 maxQS) { + Requests->Stop(); + Options_.nThreads = nTh; + Options_.MaxQueueSize = maxQS; + Requests->Start(Options_.nThreads, Options_.MaxQueueSize); + } TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_) : Requests(mainWorkers) @@ -415,29 +415,29 @@ public: options) { } - ~TImpl() { - try { - Stop(); - } catch (...) { + ~TImpl() { + try { + Stop(); + } catch (...) { } - } + } inline const TOptions& Options() const noexcept { - return Options_; - } + return Options_; + } inline void DecreaseConnections() noexcept { - AtomicDecrement(ConnectionCount); - } + AtomicDecrement(ConnectionCount); + } inline void IncreaseConnections() noexcept { - AtomicIncrement(ConnectionCount); - } - - inline i64 GetClientCount() const { - return AtomicGet(ConnectionCount); - } - + AtomicIncrement(ConnectionCount); + } + + inline i64 GetClientCount() const { + return AtomicGet(ConnectionCount); + } + inline bool MaxRequestsReached() const { return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections); } @@ -449,11 +449,11 @@ public: TMtpQueueRef Requests; TMtpQueueRef FailRequests; TAtomic ConnectionCount = 0; - THolder<TSocketPoller> Poller; - THolder<TConnections> Connections; + THolder<TSocketPoller> Poller; + THolder<TConnections> Connections; bool ListenerRunningOK = false; int ErrorCode = 0; - TOptions Options_; + TOptions Options_; ICallBack* Cb_ = nullptr; THttpServer* Parent_ = nullptr; TWakeupPollAble WakeupPollAble; @@ -571,7 +571,7 @@ TClientConnection::~TClientConnection() { } void TClientConnection::OnPollEvent(TInstant now) { - THolder<TClientConnection> this_(this); + THolder<TClientConnection> this_(this); Activate(now); { @@ -588,7 +588,7 @@ void TClientConnection::OnPollEvent(TInstant now) { } } - THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); + THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_)); AcceptMoment = now; HttpServ_->AddRequest(obj, Reject_); @@ -601,7 +601,7 @@ void TClientConnection::Activate(TInstant now) noexcept { } void TClientConnection::DeActivate() { - HttpServ_->Connections->Add(this); + HttpServ_->Connections->Add(this); } void TClientConnection::Reject() { @@ -677,7 +677,7 @@ void TClientRequest::ResetConnection() { } void TClientRequest::Process(void* ThreadSpecificResource) { - THolder<TClientRequest> this_(this); + THolder<TClientRequest> this_(this); auto* serverImpl = Conn_->HttpServ_; diff --git a/library/cpp/http/server/http.h b/library/cpp/http/server/http.h index d0bc92ff7d..b292d38f27 100644 --- a/library/cpp/http/server/http.h +++ b/library/cpp/http/server/http.h @@ -15,75 +15,75 @@ class TClientRequest; class TClientConnection; class THttpServer { - friend class TClientRequest; - friend class TClientConnection; + friend class TClientRequest; + friend class TClientConnection; -public: - class ICallBack { +public: + class ICallBack { public: - struct TFailLogData { - int failstate; + struct TFailLogData { + int failstate; TString url; - }; + }; - virtual ~ICallBack() { - } + virtual ~ICallBack() { + } - virtual void OnFailRequest(int /*failstate*/) { - } + virtual void OnFailRequest(int /*failstate*/) { + } - virtual void OnFailRequestEx(const TFailLogData& d) { - OnFailRequest(d.failstate); - } + virtual void OnFailRequestEx(const TFailLogData& d) { + OnFailRequest(d.failstate); + } - virtual void OnException() { - } + virtual void OnException() { + } - virtual void OnMaxConn() { - } + virtual void OnMaxConn() { + } - virtual TClientRequest* CreateClient() = 0; + virtual TClientRequest* CreateClient() = 0; - virtual void OnListenStart() { - } + virtual void OnListenStart() { + } - virtual void OnListenStop() { - } + virtual void OnListenStop() { + } - virtual void OnWait() { - } + virtual void OnWait() { + } virtual void* CreateThreadSpecificResource() { return nullptr; - } + } - virtual void DestroyThreadSpecificResource(void*) { - } - }; + virtual void DestroyThreadSpecificResource(void*) { + } + }; - typedef THttpServerOptions TOptions; + typedef THttpServerOptions TOptions; typedef TSimpleSharedPtr<IThreadPool> TMtpQueueRef; THttpServer(ICallBack* cb, const TOptions& options = TOptions(), IThreadFactory* pool = nullptr); THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options = TOptions()); - virtual ~THttpServer(); + virtual ~THttpServer(); - bool Start(); + bool Start(); - // shutdown a.s.a.p. - void Stop(); + // shutdown a.s.a.p. + void Stop(); // graceful shutdown with serving all already open connections - void Shutdown(); + void Shutdown(); - void Wait(); - int GetErrorCode(); - const char* GetError(); - void RestartRequestThreads(ui32 nTh, ui32 maxQS); + void Wait(); + int GetErrorCode(); + const char* GetError(); + void RestartRequestThreads(ui32 nTh, ui32 maxQS); const TOptions& Options() const noexcept; - i64 GetClientCount() const; + i64 GetClientCount() const; - class TImpl; + class TImpl; size_t GetRequestQueueSize() const; size_t GetFailQueueSize() const; @@ -92,30 +92,30 @@ public: static TAtomicBase AcceptReturnsInvalidSocketCounter(); -private: +private: bool MaxRequestsReached() const; private: - THolder<TImpl> Impl_; + THolder<TImpl> Impl_; }; /** * @deprecated Use TRequestReplier instead */ class TClientRequest: public IObjectInQueue { - friend class THttpServer::TImpl; + friend class THttpServer::TImpl; -public: - TClientRequest(); +public: + TClientRequest(); ~TClientRequest() override; inline THttpInput& Input() noexcept { - return *HttpConn_->Input(); - } + return *HttpConn_->Input(); + } inline THttpOutput& Output() noexcept { - return *HttpConn_->Output(); - } + return *HttpConn_->Output(); + } THttpServer* HttpServ() const noexcept; const TSocket& Socket() const noexcept; @@ -123,29 +123,29 @@ public: TInstant AcceptMoment() const noexcept; bool IsLocal() const; - bool CheckLoopback(); - void ProcessFailRequest(int failstate); + bool CheckLoopback(); + void ProcessFailRequest(int failstate); void ReleaseConnection(); void ResetConnection(); -private: - /* - * Processes the request after 'connection' been created and 'Headers' been read - * Returns 'false' if the processing must be continued by the next handler, - * 'true' otherwise ('this' will be deleted) - */ - virtual bool Reply(void* ThreadSpecificResource); +private: + /* + * Processes the request after 'connection' been created and 'Headers' been read + * Returns 'false' if the processing must be continued by the next handler, + * 'true' otherwise ('this' will be deleted) + */ + virtual bool Reply(void* ThreadSpecificResource); void Process(void* ThreadSpecificResource) override; -public: +public: TVector<std::pair<TString, TString>> ParsedHeaders; TString RequestString; -private: - THolder<TClientConnection> Conn_; - THolder<THttpServerConn> HttpConn_; +private: + THolder<TClientConnection> Conn_; + THolder<THttpServerConn> HttpConn_; }; class TRequestReplier: public TClientRequest { diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index 5f21fc935c..cc62bb988e 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -330,7 +330,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); for (int i = 0; i < 2; ++i) { - UNIT_ASSERT(server.Start()); + UNIT_ASSERT(server.Start()); TTestRequest r(port); r.Content = res; diff --git a/library/cpp/http/server/ut/ya.make b/library/cpp/http/server/ut/ya.make index 47a208e867..bcb4d4c0b8 100644 --- a/library/cpp/http/server/ut/ya.make +++ b/library/cpp/http/server/ut/ya.make @@ -1,6 +1,6 @@ UNITTEST_FOR(library/cpp/http/server) -OWNER(pg) +OWNER(pg) SIZE(MEDIUM) diff --git a/library/cpp/lfalloc/lf_allocX64.h b/library/cpp/lfalloc/lf_allocX64.h index 979e26c699..fd2a906d6f 100644 --- a/library/cpp/lfalloc/lf_allocX64.h +++ b/library/cpp/lfalloc/lf_allocX64.h @@ -98,17 +98,17 @@ static inline long AtomicSub(TAtomic& a, long b) { #include <new> #include <errno.h> -#if defined(_linux_) +#if defined(_linux_) #include <linux/futex.h> #include <sys/syscall.h> #if !defined(MADV_HUGEPAGE) #define MADV_HUGEPAGE 14 -#endif +#endif #if !defined(MAP_HUGETLB) #define MAP_HUGETLB 0x40000 #endif #endif - + #define PERTHREAD __thread #endif diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h index 573458ba72..b79c0cc355 100644 --- a/library/cpp/messagebus/config/netaddr.h +++ b/library/cpp/messagebus/config/netaddr.h @@ -1,6 +1,6 @@ #pragma once -#include <util/digest/numeric.h> +#include <util/digest/numeric.h> #include <util/generic/hash.h> #include <util/generic/ptr.h> #include <util/generic/strbuf.h> @@ -74,13 +74,13 @@ namespace NBus { switch (s->sa_family) { case AF_INET: return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa->sin_addr), sizeof(sa->sin_addr))), IntHashImpl(sa->sin_port)); - + case AF_INET6: return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa6->sin6_addr), sizeof(sa6->sin6_addr))), IntHashImpl(sa6->sin6_port)); } - + return ComputeHash(TStringBuf(reinterpret_cast<const char*>(s), a.Len())); - } + } }; - + } diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp index d9436f15d7..d9411bb5db 100644 --- a/library/cpp/messagebus/coreconn.cpp +++ b/library/cpp/messagebus/coreconn.cpp @@ -2,10 +2,10 @@ #include "remote_connection.h" -#include <util/datetime/base.h> +#include <util/datetime/base.h> #include <util/generic/yexception.h> #include <util/network/socket.h> -#include <util/string/util.h> +#include <util/string/util.h> #include <util/system/thread.h> namespace NBus { diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h index f6ec07bef4..fca228d82e 100644 --- a/library/cpp/messagebus/coreconn.h +++ b/library/cpp/messagebus/coreconn.h @@ -5,24 +5,24 @@ /// \brief Definitions for asynchonous connection queue #include "base.h" -#include "event_loop.h" +#include "event_loop.h" #include "netaddr.h" -#include <util/datetime/base.h> +#include <util/datetime/base.h> #include <util/generic/algorithm.h> #include <util/generic/list.h> -#include <util/generic/map.h> -#include <util/generic/set.h> +#include <util/generic/map.h> +#include <util/generic/set.h> #include <util/generic/string.h> #include <util/generic/vector.h> #include <util/network/address.h> #include <util/network/ip.h> -#include <util/network/poller.h> -#include <util/string/util.h> -#include <util/system/condvar.h> +#include <util/network/poller.h> +#include <util/string/util.h> +#include <util/system/condvar.h> #include <util/system/mutex.h> #include <util/system/thread.h> -#include <util/thread/lfqueue.h> +#include <util/thread/lfqueue.h> #include <deque> #include <utility> @@ -31,9 +31,9 @@ #undef NO_ERROR #endif -#define BUS_WORKER_CONDVAR -//#define BUS_WORKER_MIXED - +#define BUS_WORKER_CONDVAR +//#define BUS_WORKER_MIXED + namespace NBus { class TBusConnection; class TBusConnectionFactory; @@ -64,4 +64,4 @@ namespace NBus { POLL_WRITE }; -} +} diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp index b1209d2b5c..f685135bed 100644 --- a/library/cpp/messagebus/event_loop.cpp +++ b/library/cpp/messagebus/event_loop.cpp @@ -5,50 +5,50 @@ #include <util/generic/hash.h> #include <util/network/pair.h> -#include <util/network/poller.h> +#include <util/network/poller.h> #include <util/system/event.h> #include <util/system/mutex.h> #include <util/system/thread.h> -#include <util/system/yassert.h> +#include <util/system/yassert.h> #include <util/thread/lfqueue.h> - + #include <errno.h> -using namespace NEventLoop; - -namespace { +using namespace NEventLoop; + +namespace { enum ERunningState { EVENT_LOOP_CREATED, EVENT_LOOP_RUNNING, EVENT_LOOP_STOPPED, }; - enum EOperation { - OP_READ = 1, - OP_WRITE = 2, - OP_READ_WRITE = OP_READ | OP_WRITE, - }; -} - -class TChannel::TImpl { -public: + enum EOperation { + OP_READ = 1, + OP_WRITE = 2, + OP_READ_WRITE = OP_READ | OP_WRITE, + }; +} + +class TChannel::TImpl { +public: TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie); ~TImpl(); - - void EnableRead(); - void DisableRead(); - void EnableWrite(); - void DisableWrite(); - - void Unregister(); - - SOCKET GetSocket() const; + + void EnableRead(); + void DisableRead(); + void EnableWrite(); + void DisableWrite(); + + void Unregister(); + + SOCKET GetSocket() const; TSocket GetSocketPtr() const; - + void Update(int pollerFlags, bool enable); void CallHandler(); - TEventLoop::TImpl* EventLoop; + TEventLoop::TImpl* EventLoop; TSocket Socket; TEventHandlerPtr EventHandler; void* Cookie; @@ -57,130 +57,130 @@ public: int CurrentFlags; bool Close; -}; - -class TEventLoop::TImpl { -public: +}; + +class TEventLoop::TImpl { +public: TImpl(const char* name); - - void Run(); - void Wakeup(); - void Stop(); - + + void Run(); + void Wakeup(); + void Stop(); + TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie); - void Unregister(SOCKET socket); - + void Unregister(SOCKET socket); + typedef THashMap<SOCKET, TChannelPtr> TData; - + void AddToPoller(SOCKET socket, void* cookie, int flags); - + TMutex Mutex; - + const char* Name; TAtomic RunningState; TAtomic StopSignal; TSystemEvent StoppedEvent; - TData Data; - + TData Data; + TLockFreeQueue<SOCKET> SocketsToRemove; - TSocketPoller Poller; - TSocketHolder WakeupReadSocket; - TSocketHolder WakeupWriteSocket; -}; - -TChannel::~TChannel() { -} - -void TChannel::EnableRead() { - Impl->EnableRead(); -} - -void TChannel::DisableRead() { - Impl->DisableRead(); -} - -void TChannel::EnableWrite() { - Impl->EnableWrite(); -} - -void TChannel::DisableWrite() { - Impl->DisableWrite(); -} - -void TChannel::Unregister() { - Impl->Unregister(); -} - -SOCKET TChannel::GetSocket() const { - return Impl->GetSocket(); -} - + TSocketPoller Poller; + TSocketHolder WakeupReadSocket; + TSocketHolder WakeupWriteSocket; +}; + +TChannel::~TChannel() { +} + +void TChannel::EnableRead() { + Impl->EnableRead(); +} + +void TChannel::DisableRead() { + Impl->DisableRead(); +} + +void TChannel::EnableWrite() { + Impl->EnableWrite(); +} + +void TChannel::DisableWrite() { + Impl->DisableWrite(); +} + +void TChannel::Unregister() { + Impl->Unregister(); +} + +SOCKET TChannel::GetSocket() const { + return Impl->GetSocket(); +} + TSocket TChannel::GetSocketPtr() const { return Impl->GetSocketPtr(); } TChannel::TChannel(TImpl* impl) : Impl(impl) -{ -} - +{ +} + TEventLoop::TEventLoop(const char* name) : Impl(new TImpl(name)) -{ -} - +{ +} + TEventLoop::~TEventLoop() { -} - -void TEventLoop::Run() { - Impl->Run(); -} - -void TEventLoop::Stop() { - Impl->Stop(); -} - +} + +void TEventLoop::Run() { + Impl->Run(); +} + +void TEventLoop::Stop() { + Impl->Stop(); +} + bool TEventLoop::IsRunning() { return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; } TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { return Impl->Register(socket, eventHandler, cookie); -} - +} + TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie) - : EventLoop(eventLoop) - , Socket(socket) + : EventLoop(eventLoop) + , Socket(socket) , EventHandler(eventHandler) , Cookie(cookie) , CurrentFlags(0) , Close(false) -{ -} - +{ +} + TChannel::TImpl::~TImpl() { Y_ASSERT(Close); } -void TChannel::TImpl::EnableRead() { +void TChannel::TImpl::EnableRead() { Update(OP_READ, true); -} - -void TChannel::TImpl::DisableRead() { +} + +void TChannel::TImpl::DisableRead() { Update(OP_READ, false); -} - -void TChannel::TImpl::EnableWrite() { +} + +void TChannel::TImpl::EnableWrite() { Update(OP_WRITE, true); -} - -void TChannel::TImpl::DisableWrite() { +} + +void TChannel::TImpl::DisableWrite() { Update(OP_WRITE, false); -} - -void TChannel::TImpl::Unregister() { +} + +void TChannel::TImpl::Unregister() { TGuard<TMutex> guard(Mutex); if (Close) { @@ -196,8 +196,8 @@ void TChannel::TImpl::Unregister() { EventLoop->SocketsToRemove.Enqueue(Socket); EventLoop->Wakeup(); -} - +} + void TChannel::TImpl::Update(int flags, bool enable) { TGuard<TMutex> guard(Mutex); @@ -221,10 +221,10 @@ void TChannel::TImpl::Update(int flags, bool enable) { CurrentFlags = newFlags; } -SOCKET TChannel::TImpl::GetSocket() const { - return Socket; -} - +SOCKET TChannel::TImpl::GetSocket() const { + return Socket; +} + TSocket TChannel::TImpl::GetSocketPtr() const { return Socket; } @@ -256,27 +256,27 @@ TEventLoop::TImpl::TImpl(const char* name) : Name(name) , RunningState(EVENT_LOOP_CREATED) , StopSignal(0) -{ - SOCKET wakeupSockets[2]; - +{ + SOCKET wakeupSockets[2]; + if (SocketPair(wakeupSockets) < 0) { Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText()); } - - TSocketHolder wakeupReadSocket(wakeupSockets[0]); - TSocketHolder wakeupWriteSocket(wakeupSockets[1]); - - WakeupReadSocket.Swap(wakeupReadSocket); - WakeupWriteSocket.Swap(wakeupWriteSocket); - - SetNonBlock(WakeupWriteSocket, true); - SetNonBlock(WakeupReadSocket, true); - - Poller.WaitRead(WakeupReadSocket, + + TSocketHolder wakeupReadSocket(wakeupSockets[0]); + TSocketHolder wakeupWriteSocket(wakeupSockets[1]); + + WakeupReadSocket.Swap(wakeupReadSocket); + WakeupWriteSocket.Swap(wakeupWriteSocket); + + SetNonBlock(WakeupWriteSocket, true); + SetNonBlock(WakeupReadSocket, true); + + Poller.WaitRead(WakeupReadSocket, reinterpret_cast<void*>(this)); -} - -void TEventLoop::TImpl::Run() { +} + +void TEventLoop::TImpl::Run() { bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); Y_VERIFY(res, "Invalid mbus event loop state"); @@ -285,30 +285,30 @@ void TEventLoop::TImpl::Run() { } while (AtomicGet(StopSignal) == 0) { - void* cookies[1024]; + void* cookies[1024]; const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies)); - - void** end = cookies + count; - for (void** c = cookies; c != end; ++c) { + + void** end = cookies + count; + for (void** c = cookies; c != end; ++c) { TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c); - + if (*c == this) { - char buf[0x1000]; + char buf[0x1000]; if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) { Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText()); } - continue; - } - + continue; + } + s->CallHandler(); - } + } SOCKET socket = -1; while (SocketsToRemove.Dequeue(&socket)) { TGuard<TMutex> guard(Mutex); Y_VERIFY(Data.erase(socket) == 1, "must be removed once"); } - } + } { TGuard<TMutex> guard(Mutex); @@ -325,9 +325,9 @@ void TEventLoop::TImpl::Run() { Y_VERIFY(res); StoppedEvent.Signal(); -} - -void TEventLoop::TImpl::Stop() { +} + +void TEventLoop::TImpl::Stop() { AtomicSet(StopSignal, 1); if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { @@ -335,36 +335,36 @@ void TEventLoop::TImpl::Stop() { StoppedEvent.WaitI(); } -} - +} + TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie)); - + TGuard<TMutex> guard(Mutex); - + Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted"); - + return channel; -} - -void TEventLoop::TImpl::Wakeup() { +} + +void TEventLoop::TImpl::Wakeup() { if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) { if (LastSystemError() != EAGAIN) { Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText()); } - } -} - -void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { - if (flags == OP_READ) { + } +} + +void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { + if (flags == OP_READ) { Poller.WaitReadOneShot(socket, cookie); - } else if (flags == OP_WRITE) { + } else if (flags == OP_WRITE) { Poller.WaitWriteOneShot(socket, cookie); - } else if (flags == OP_READ_WRITE) { + } else if (flags == OP_READ_WRITE) { Poller.WaitReadWriteOneShot(socket, cookie); - } else { + } else { Y_FAIL("Wrong flags: %d", int(flags)); - } -} + } +} diff --git a/library/cpp/messagebus/event_loop.h b/library/cpp/messagebus/event_loop.h index 677ade2fff..d5b0a53b0c 100644 --- a/library/cpp/messagebus/event_loop.h +++ b/library/cpp/messagebus/event_loop.h @@ -1,72 +1,72 @@ -#pragma once - +#pragma once + #include <util/generic/object_counter.h> -#include <util/generic/ptr.h> -#include <util/network/init.h> +#include <util/generic/ptr.h> +#include <util/network/init.h> #include <util/network/socket.h> - -namespace NEventLoop { - struct IEventHandler + +namespace NEventLoop { + struct IEventHandler : public TAtomicRefCount<IEventHandler> { virtual void HandleEvent(SOCKET socket, void* cookie) = 0; virtual ~IEventHandler() { } - }; - - typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr; - - class TEventLoop; - + }; + + typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr; + + class TEventLoop; + // TODO: make TChannel itself a pointer // to avoid confusion with Drop and Unregister - class TChannel + class TChannel : public TAtomicRefCount<TChannel> { - public: - ~TChannel(); - - void EnableRead(); - void DisableRead(); - void EnableWrite(); - void DisableWrite(); - - void Unregister(); - - SOCKET GetSocket() const; + public: + ~TChannel(); + + void EnableRead(); + void DisableRead(); + void EnableWrite(); + void DisableWrite(); + + void Unregister(); + + SOCKET GetSocket() const; TSocket GetSocketPtr() const; - - private: - class TImpl; - friend class TEventLoop; - + + private: + class TImpl; + friend class TEventLoop; + TObjectCounter<TChannel> ObjectCounter; TChannel(TImpl*); - - private: - THolder<TImpl> Impl; - }; - - typedef TIntrusivePtr<TChannel> TChannelPtr; - - class TEventLoop { - public: + + private: + THolder<TImpl> Impl; + }; + + typedef TIntrusivePtr<TChannel> TChannelPtr; + + class TEventLoop { + public: TEventLoop(const char* name = nullptr); - ~TEventLoop(); - - void Run(); - void Stop(); + ~TEventLoop(); + + void Run(); + void Stop(); bool IsRunning(); - + TChannelPtr Register(TSocket socket, TEventHandlerPtr, void* cookie = nullptr); - - private: - class TImpl; - friend class TChannel; - + + private: + class TImpl; + friend class TChannel; + TObjectCounter<TEventLoop> ObjectCounter; - private: - THolder<TImpl> Impl; - }; - -} + private: + THolder<TImpl> Impl; + }; + +} diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index 9176496252..3474d62705 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -56,7 +56,7 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e } TBusMessageQueue::~TBusMessageQueue() { - Stop(); + Stop(); } void TBusMessageQueue::Stop() { @@ -127,7 +127,7 @@ TString TBusMessageQueue::GetStatus(ui16 flags) const { TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) { TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name)); Add(session.Get()); - return session.Get(); + return session.Get(); } TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) { @@ -189,10 +189,10 @@ void TBusMessageQueue::DestroyAllSessions() { } } -void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) { - Scheduler.Schedule(i); +void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) { + Scheduler.Schedule(i); } - + TString TBusMessageQueue::GetNameInternal() const { return Config.Name; } diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 8a64811979..24bd778799 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -79,19 +79,19 @@ namespace NBus { TBusModuleImpl* const Module; }; - + struct TModuleServerHandler : public IBusServerHandler { TModuleServerHandler(TBusModuleImpl* module) : Module(module) { } - + void OnMessage(TOnMessageContext& msg) override; - + TBusModuleImpl* const Module; }; - + struct TBusModuleImpl: public TBusModuleInternal { TBusModule* const Module; @@ -677,7 +677,7 @@ namespace NBus { return true; } - + bool TBusModule::Shutdown() { Impl->Shutdown(); @@ -702,7 +702,7 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { Impl->Queue = queue; return true; } - + int TBusModule::GetModuleSessionInFlight() const { return Impl->Size(); } @@ -781,11 +781,11 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) { if (jobCount == 0) { ShutdownCondVar.BroadCast(); } - } + } } job->JobStorageIterator = TList<TJobRunner*>::iterator(); -} +} void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) { TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage(); diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index 1b75c4df46..8d1c4a5d52 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -407,4 +407,4 @@ namespace NBus { TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config); }; -} +} diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp index 7bd6e115c7..3bc421944f 100644 --- a/library/cpp/messagebus/remote_client_session.cpp +++ b/library/cpp/messagebus/remote_client_session.cpp @@ -1,10 +1,10 @@ #include "remote_client_session.h" - + #include "mb_lwtrace.h" #include "remote_client_connection.h" #include <library/cpp/messagebus/scheduler/scheduler.h> - + #include <util/generic/cast.h> #include <util/system/defaults.h> @@ -19,9 +19,9 @@ TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue, : TBusSessionImpl(true, queue, proto, handler, config, name) , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight") , ClientHandler(handler) -{ -} - +{ +} + TRemoteClientSession::~TRemoteClientSession() { //Cerr << "~TRemoteClientSession" << Endl; } @@ -31,7 +31,7 @@ void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps< temp->swap(newMsg); c->ReplyQueue.EnqueueAll(temp); c->ScheduleWrite(); -} +} EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) { if (Y_UNLIKELY(IsDown())) { diff --git a/library/cpp/messagebus/remote_client_session.h b/library/cpp/messagebus/remote_client_session.h index f619d4d86a..7160d0dae9 100644 --- a/library/cpp/messagebus/remote_client_session.h +++ b/library/cpp/messagebus/remote_client_session.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "remote_client_session_semaphore.h" #include "session_impl.h" @@ -14,7 +14,7 @@ namespace NBus { namespace NPrivate { using TRemoteClientSessionPtr = TIntrusivePtr<TRemoteClientSession>; - + class TRemoteClientSession: public TBusClientSession, public TBusSessionImpl { friend class TRemoteClientConnection; friend class TInvokeOnReply; diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 59b58f7797..22932569db 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -7,9 +7,9 @@ #include "remote_client_session.h" #include "remote_server_session.h" #include "session_impl.h" - + #include <library/cpp/messagebus/actor/what_thread_does.h> - + #include <util/generic/cast.h> #include <util/network/init.h> #include <util/system/atomic.h> @@ -44,7 +44,7 @@ namespace NBus { WriterData.Status.ConnectionId = connectionId; WriterData.Status.PeerAddr = PeerAddr; ReaderData.Status.ConnectionId = connectionId; - + const TInstant now = TInstant::Now(); WriterFillStatus(); @@ -70,7 +70,7 @@ namespace NBus { Y_VERIFY(AtomicGet(Down)); Y_VERIFY(SendQueue.Empty()); } - + bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { size_t left = Buffer.Size() - Offset; @@ -137,7 +137,7 @@ namespace NBus { void TRemoteConnection::Shutdown(EMessageStatus status) { ScheduleShutdown(status); - + ReaderData.ShutdownComplete.WaitI(); WriterData.ShutdownComplete.WaitI(); } @@ -145,15 +145,15 @@ namespace NBus { void TRemoteConnection::TryConnect() { Y_FAIL("TryConnect is client connection only operation"); } - + void TRemoteConnection::ScheduleRead() { GetReaderActor()->Schedule(); } - + void TRemoteConnection::ScheduleWrite() { GetWriterActor()->Schedule(); } - + void TRemoteConnection::WriterRotateCounters() { if (!WriterData.TimeToRotateCounters.FetchTask()) { return; @@ -383,7 +383,7 @@ namespace NBus { if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { ReaderData.Status.Incremental.BufferDrops += 1; - + TBuffer temp; // probably should use another constant temp.Reserve(Config.DefaultBufferSize); @@ -391,7 +391,7 @@ namespace NBus { ReaderData.Buffer.Swap(temp); } - + return true; } @@ -406,7 +406,7 @@ namespace NBus { ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); } } - + Y_ASSERT(ReaderData.Buffer.Avail() > 0); ssize_t bytes; @@ -465,27 +465,27 @@ namespace NBus { if (!Session->IsSource_) { message->SendTime = now.MilliSeconds(); } - + WriterData.SendQueue.PushBack(message); } - + void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); - } - + } + void TRemoteConnection::WriterFillInFlight() { // this is hack for TLoadBalancedProtocol WriterFillStatus(); AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); } - + const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { WriterRotateCounters(); WriterFillStatus(); return WriterData.Status; } - + void TRemoteConnection::WriterFillStatus() { if (!!WriterData.Channel) { WriterData.Status.Fd = WriterData.Channel->GetSocket(); @@ -644,11 +644,11 @@ namespace NBus { if (WriterData.Buffer.Capacity() > MaxBufferSize) { WriterData.Status.Incremental.BufferDrops += 1; WriterData.Buffer.Reset(); - } + } WriterData.State = WRITER_FILLING; } - + void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { if (Session->IsSource_) { WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); @@ -662,11 +662,11 @@ namespace NBus { AtomicSet(ReaderData.Down, 1); ScheduleRead(); - + AtomicSet(WriterData.Down, 1); ScheduleWrite(); } - + void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { size_t posForAssertion = buffer.Size(); Proto->Serialize(msg, buffer); @@ -688,12 +688,12 @@ namespace NBus { } } - + void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { size_t pos = data->Size(); - + size_t dataSize; - + bool compressionRequested = msg->IsCompressed(); if (compressionRequested) { @@ -821,20 +821,20 @@ namespace NBus { TBusMessagePtrAndHeader h(r); r->RecvTime = now; - + QuotaConsume(1, header.Size); ReaderData.ReadMessages.push_back(h); if (ReaderData.ReadMessages.size() >= 100) { ReaderFlushMessages(); } - + return true; } void TRemoteConnection::WriterFillBuffer() { Y_ASSERT(WriterData.State == WRITER_FILLING); - + Y_ASSERT(WriterData.Buffer.LeftSize() == 0); if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { @@ -868,7 +868,7 @@ namespace NBus { WriterData.CorkUntil = TInstant::Now() + Config.Cork; } } - + size_t sizeBeforeSerialize = WriterData.Buffer.Size(); TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; @@ -952,7 +952,7 @@ namespace NBus { void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { ResetOneWayFlag(ms); - + WriterData.Status.Incremental.StatusCounter[status] += ms.size(); for (auto m : ms) { Session->InvokeOnError(m, status); diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h index ee0665774d..4538947368 100644 --- a/library/cpp/messagebus/remote_connection.h +++ b/library/cpp/messagebus/remote_connection.h @@ -1,8 +1,8 @@ -#pragma once - +#pragma once + #include "async_result.h" #include "defs.h" -#include "event_loop.h" +#include "event_loop.h" #include "left_right_buffer.h" #include "lfqueue_batch.h" #include "message_ptr_and_header.h" @@ -15,7 +15,7 @@ #include "ybus.h" #include "misc/granup.h" #include "misc/tokenquota.h" - + #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/executor.h> #include <library/cpp/messagebus/actor/queue_for_actor.h> @@ -96,7 +96,7 @@ namespace NBus { void Shutdown(EMessageStatus status); inline const TNetAddr& GetAddr() const noexcept; - + private: friend class TScheduleConnect; friend class TWorkIO; @@ -111,14 +111,14 @@ namespace NBus { bool ReaderProcessBuffer(); bool ReaderFillBuffer(); void ReaderFlushMessages(); - + void ReadQuotaWakeup(); ui32 WriteWakeFlags() const; - + virtual bool NeedInterruptRead() { return false; } - + public: virtual void TryConnect(); void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage); @@ -174,7 +174,7 @@ namespace NBus { void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status); // takes ownership of ms void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status); - + void FireClientConnectionEvent(TClientConnectionEvent::EType); size_t GetInFlight(); @@ -207,14 +207,14 @@ namespace NBus { NEventLoop::TChannelPtr Channel; ui32 SocketVersion; - + TRemoteConnectionWriterStatus Status; TInstant StatusLastSendTime; - + TLocalTasks TimeToRotateCounters; TAtomic InFlight; - + TTimedMessages SendQueue; ui32 AwakeFlags; EWriterState State; @@ -290,5 +290,5 @@ namespace NBus { typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr; - } + } } diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp index 12765ab9b4..6abbf88a60 100644 --- a/library/cpp/messagebus/remote_server_session.cpp +++ b/library/cpp/messagebus/remote_server_session.cpp @@ -21,14 +21,14 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue, : TBusSessionImpl(false, queue, proto, handler, config, name) , ServerOwnedMessages(config.MaxInFlight, config.MaxInFlightBySize, "ServerOwnedMessages") , ServerHandler(handler) -{ +{ if (config.PerConnectionMaxInFlightBySize > 0) { if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize) ythrow yexception() << "too low PerConnectionMaxInFlightBySize value"; } -} - +} + namespace NBus { namespace NPrivate { class TInvokeOnMessage: public IWorkItem { @@ -83,7 +83,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps< JobCount.Add(workQueueTemp.GetVector()->size()); Queue->EnqueueWork(*workQueueTemp.GetVector()); } -} +} void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) { if (Y_UNLIKELY(AtomicGet(Down))) { diff --git a/library/cpp/messagebus/remote_server_session.h b/library/cpp/messagebus/remote_server_session.h index c70dde00e2..f5c266a7f7 100644 --- a/library/cpp/messagebus/remote_server_session.h +++ b/library/cpp/messagebus/remote_server_session.h @@ -1,8 +1,8 @@ -#pragma once - +#pragma once + #include "remote_server_session_semaphore.h" #include "session_impl.h" - + #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance @@ -12,7 +12,7 @@ namespace NBus { namespace NPrivate { class TRemoteServerSession: public TBusServerSession, public TBusSessionImpl { friend class TRemoteServerConnection; - + private: TObjectCounter<TRemoteServerSession> ObjectCounter; diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp index 5c0686d32a..5a5fe52894 100644 --- a/library/cpp/messagebus/scheduler/scheduler.cpp +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -1,37 +1,37 @@ -#include "scheduler.h" - -#include <util/datetime/base.h> -#include <util/generic/algorithm.h> -#include <util/generic/yexception.h> - +#include "scheduler.h" + +#include <util/datetime/base.h> +#include <util/generic/algorithm.h> +#include <util/generic/yexception.h> + //#include "dummy_debugger.h" using namespace NBus; using namespace NBus::NPrivate; - -class TScheduleDeadlineCompare { -public: + +class TScheduleDeadlineCompare { +public: bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept { return i1->GetScheduleTime() > i2->GetScheduleTime(); - } -}; - -TScheduler::TScheduler() + } +}; + +TScheduler::TScheduler() : StopThread(false) , Thread([&] { this->SchedulerThread(); }) -{ -} - -TScheduler::~TScheduler() { +{ +} + +TScheduler::~TScheduler() { Y_VERIFY(StopThread, "state check"); -} - +} + size_t TScheduler::Size() const { TGuard<TLock> guard(Lock); return Items.size() + (!!NextItem ? 1 : 0); } -void TScheduler::Stop() { +void TScheduler::Stop() { { TGuard<TLock> guard(Lock); Y_VERIFY(!StopThread, "Scheduler already stopped"); @@ -46,28 +46,28 @@ void TScheduler::Stop() { for (auto& item : Items) { item.Destroy(); - } -} - -void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { + } +} + +void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { TGuard<TLock> lock(Lock); if (StopThread) - return; + return; if (!!NextItem) { if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { DoSwap(i, NextItem); } - } + } - Items.push_back(i); + Items.push_back(i); PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); FillNextItem(); CondVar.Signal(); -} - +} + void TScheduler::FillNextItem() { if (!NextItem && !Items.empty()) { PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); @@ -76,22 +76,22 @@ void TScheduler::FillNextItem() { } } -void TScheduler::SchedulerThread() { +void TScheduler::SchedulerThread() { for (;;) { IScheduleItemAutoPtr current; - { + { TGuard<TLock> guard(Lock); if (StopThread) { break; - } + } if (!!NextItem) { CondVar.WaitD(Lock, NextItem->GetScheduleTime()); } else { CondVar.WaitI(Lock); - } + } if (StopThread) { break; @@ -106,7 +106,7 @@ void TScheduler::SchedulerThread() { } current = NextItem.Release(); - } + } current->Do(); current.Destroy(); @@ -115,5 +115,5 @@ void TScheduler::SchedulerThread() { TGuard<TLock> guard(Lock); FillNextItem(); } - } -} + } +} diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h index 996bf30f8c..afcc0de55d 100644 --- a/library/cpp/messagebus/scheduler/scheduler.h +++ b/library/cpp/messagebus/scheduler/scheduler.h @@ -1,16 +1,16 @@ -#pragma once - +#pragma once + #include <library/cpp/threading/future/legacy_future.h> #include <util/datetime/base.h> #include <util/generic/object_counter.h> -#include <util/generic/ptr.h> -#include <util/generic/vector.h> -#include <util/system/atomic.h> +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/system/atomic.h> #include <util/system/condvar.h> #include <util/system/mutex.h> -#include <util/system/thread.h> - +#include <util/system/thread.h> + namespace NBus { namespace NPrivate { class IScheduleItem { @@ -25,30 +25,30 @@ namespace NBus { private: TInstant ScheduleTime; }; - + using IScheduleItemAutoPtr = TAutoPtr<IScheduleItem>; - + class TScheduler { public: TScheduler(); ~TScheduler(); void Stop(); void Schedule(TAutoPtr<IScheduleItem> i); - + size_t Size() const; - + private: void SchedulerThread(); - + void FillNextItem(); - + private: TVector<IScheduleItemAutoPtr> Items; IScheduleItemAutoPtr NextItem; typedef TMutex TLock; TLock Lock; TCondVar CondVar; - + TObjectCounter<TScheduler> ObjectCounter; bool StopThread; @@ -63,6 +63,6 @@ namespace NBus { inline TInstant IScheduleItem::GetScheduleTime() const noexcept { return ScheduleTime; } - + } -} +} diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index 76790221ec..ddf9f360c4 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -14,7 +14,7 @@ using namespace NActor; using namespace NBus; using namespace NBus::NPrivate; using namespace NEventLoop; - + namespace { class TScheduleSession: public IScheduleItem { public: @@ -95,7 +95,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get()) , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get()) , Impl(new TImpl) - , IsSource_(isSource) + , IsSource_(isSource) , Queue(queue) , Proto(proto) , ProtoName(Proto->GetService()) @@ -106,16 +106,16 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro , ReadEventLoop("rd-el") , LastAcceptorId(0) , LastConnectionId(0) - , Down(0) -{ + , Down(0) +{ Impl->DeadAcceptorStatusSummary.Summary = true; ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop)))); WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop)))); Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod))); -} - +} + TBusSessionImpl::~TBusSessionImpl() { Y_VERIFY(Down); Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero())); @@ -160,11 +160,11 @@ void TBusSessionImpl::Shutdown() { TGuard<TMutex> guard(ConnectionsLock); Acceptors.clear(); } - + for (auto& acceptor : acceptors) { acceptor->Shutdown(); - } - + } + // shutdown connections TVector<TRemoteConnectionPtr> cs; GetConnections(&cs); @@ -189,12 +189,12 @@ void TBusSessionImpl::Shutdown() { HandlerUseCountHolder.Reset(); ShutdownCompleteEvent.Signal(); -} - +} + bool TBusSessionImpl::IsDown() { - return static_cast<bool>(AtomicGet(Down)); -} - + return static_cast<bool>(AtomicGet(Down)); +} + size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const { TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); if (!!conn) { @@ -202,8 +202,8 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const { } else { return 0; } -} - +} + void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const { Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); for (size_t i = 0; i < addrs.size(); ++i) { @@ -427,7 +427,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() { } r.Config = Config; - + TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex); StatusData.StatusDumpCached = r; } @@ -490,7 +490,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) { void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) { Y_ASSERT(q == Queue); int actualPort = -1; - + for (const TBindResult& br : bindTo) { if (actualPort == -1) { actualPort = br.Addr.GetPort(); @@ -502,14 +502,14 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu } TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr)); - + TConnectionsGuard guard(ConnectionsLock); InsertAcceptorLockAcquired(acceptor.Get()); - } + } Config.ListenPort = actualPort; -} - +} + void TBusSessionImpl::SendSnapshotToStatusActor() { //Y_ASSERT(ConnectionsLock.IsLocked()); @@ -604,24 +604,24 @@ void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message, TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) { TConnectionsGuard guard(ConnectionsLock); - - TAddrRemoteConnections::const_iterator it = Connections.find(addr); - if (it != Connections.end()) { - return it->second; - } - - if (!create) { - return TRemoteConnectionPtr(); - } - + + TAddrRemoteConnections::const_iterator it = Connections.find(addr); + if (it != Connections.end()) { + return it->second; + } + + if (!create) { + return TRemoteConnectionPtr(); + } + Y_VERIFY(IsSource_, "must be source"); TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr())); InsertConnectionLockAcquired(c.Get()); - - return c; -} - + + return c; +} + void TBusSessionImpl::Cron() { TVector<TRemoteConnectionPtr> connections; GetConnections(&connections); diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h index d980ce6ce3..90ef246ff8 100644 --- a/library/cpp/messagebus/session_impl.h +++ b/library/cpp/messagebus/session_impl.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "acceptor_status.h" #include "async_result.h" #include "event_loop.h" @@ -9,7 +9,7 @@ #include "session_job_count.h" #include "shutdown_state.h" #include "ybus.h" - + #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/queue_in_actor.h> #include <library/cpp/messagebus/monitoring/mon_proto.pb.h> @@ -25,7 +25,7 @@ namespace NBus { typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr; typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr; - + typedef TIntrusivePtr<TAcceptor> TAcceptorPtr; typedef TVector<TAcceptorPtr> TAcceptorsPtrs; @@ -34,7 +34,7 @@ namespace NBus { TVector<TAcceptorPtr> Acceptors; ui64 LastConnectionId; ui64 LastAcceptorId; - + TConnectionsAcceptorsSnapshot(); }; @@ -96,13 +96,13 @@ namespace NBus { const TBusSessionConfig& config, const TString& name); ~TBusSessionImpl() override; - + void Shutdown() override; bool IsDown(); size_t GetInFlightImpl(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; - + void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; @@ -123,29 +123,29 @@ namespace NBus { void StatusUpdateCachedDumpIfNecessary(TInstant now); void Act(TStatusTag); void Act(TConnectionTag); - + TBusProtocol* GetProto() const noexcept override; const TBusSessionConfig* GetConfig() const noexcept override; TBusMessageQueue* GetQueue() const noexcept override; TString GetNameInternal() override; virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0; - + void Listen(int port, TBusMessageQueue* q); void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q); TBusConnection* Accept(SOCKET listen); - + inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() { return this; } inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() { return this; } - + typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections; void SendSnapshotToStatusActor(); - + void InsertConnectionLockAcquired(TRemoteConnection* connection); void InsertAcceptorLockAcquired(TAcceptor* acceptor); @@ -159,7 +159,7 @@ namespace NBus { TAcceptorPtr GetAcceptorById(ui64 id); void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus); - + void Cron(); TBusSessionJobCount JobCount; @@ -193,7 +193,7 @@ namespace NBus { struct TStatusData { TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot; ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue; - + TAtomicShutdownState ShutdownState; TBusSessionStatus Status; @@ -246,14 +246,14 @@ namespace NBus { inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { return Proto; } - + inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept { return &Config; } - + inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept { return Queue; } - - } + + } } diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp index 6743f0abe4..efefc87340 100644 --- a/library/cpp/messagebus/storage.cpp +++ b/library/cpp/messagebus/storage.cpp @@ -6,7 +6,7 @@ namespace NBus { namespace NPrivate { TTimedMessages::TTimedMessages() { } - + TTimedMessages::~TTimedMessages() { Y_VERIFY(Items.empty()); } @@ -33,14 +33,14 @@ namespace NBus { size_t TTimedMessages::Size() const { return Items.size(); } - + void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) { // shortcut if (before == TInstant::Max()) { Clear(r); return; } - + while (!Items.empty()) { TItem& i = *Items.front(); if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) { @@ -50,14 +50,14 @@ namespace NBus { Items.pop_front(); } } - + void TTimedMessages::Clear(TMessagesPtrs* r) { while (!Items.empty()) { r->push_back(Items.front()->Message.Release()); Items.pop_front(); } } - + TSyncAckMessages::TSyncAckMessages() { KeyToMessage.set_empty_key(0); KeyToMessage.set_deleted_key(1); @@ -66,14 +66,14 @@ namespace NBus { TSyncAckMessages::~TSyncAckMessages() { Y_VERIFY(KeyToMessage.empty()); Y_VERIFY(TimedItems.empty()); - } - + } + void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) { // Perform garbage collection if `TimedMessages` contain too many junk data if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) { Gc(); } - + TValue value = {m.MessagePtr.Release()}; std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value)); @@ -95,7 +95,7 @@ namespace NBus { return v.Message; } - + void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) { // shortcut if (before == TInstant::Max()) { @@ -110,7 +110,7 @@ namespace NBus { if (TInstant::MilliSeconds(i.SendTime) > before) { break; } - + TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key); if (itMessage != KeyToMessage.end()) { @@ -133,7 +133,7 @@ namespace NBus { void TSyncAckMessages::Gc() { TDeque<TTimedItem> tmp; - + for (auto& timedItem : TimedItems) { if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) { continue; @@ -143,7 +143,7 @@ namespace NBus { TimedItems.swap(tmp); } - + void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) { for (auto message : messages) { TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id); @@ -158,4 +158,4 @@ namespace NBus { } } -} +} diff --git a/library/cpp/messagebus/storage.h b/library/cpp/messagebus/storage.h index 3f8de480a1..7d168844ed 100644 --- a/library/cpp/messagebus/storage.h +++ b/library/cpp/messagebus/storage.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "message_ptr_and_header.h" #include "moved.h" #include "ybus.h" @@ -18,7 +18,7 @@ namespace NBus { public: TTimedMessages(); ~TTimedMessages(); - + struct TItem { THolder<TBusMessage> Message; @@ -36,31 +36,31 @@ namespace NBus { void Timeout(TInstant before, TMessagesPtrs* r); void Clear(TMessagesPtrs* r); - + private: TItems Items; }; - + class TSyncAckMessages : TNonCopyable { public: TSyncAckMessages(); ~TSyncAckMessages(); - + void Push(TBusMessagePtrAndHeader& m); TBusMessage* Pop(TBusKey id); void Timeout(TInstant before, TMessagesPtrs* r); void Clear(TMessagesPtrs* r); - + size_t Size() const { return KeyToMessage.size(); } - + void RemoveAll(const TMessagesPtrs&); - + void Gc(); - + void DumpState(); private: diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index e4ea37f0f3..8489319278 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -605,10 +605,10 @@ int main(int argc, char* argv[]) { opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); - opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); - opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); - opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); - opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); + opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); + opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); + opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); + opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") .RequiredArgument("BOOL") diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index a358339513..040f9b7702 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -1,5 +1,5 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/fixed_port.h> #include <library/cpp/messagebus/test/helper/hanging_server.h> @@ -13,7 +13,7 @@ #include <utility> using namespace NBus; -using namespace NBus::NTest; +using namespace NBus::NTest; namespace { struct TExampleClientSlowOnMessageSent: public TExampleClient { diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index bb21cf1c99..de21ad8521 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -193,7 +193,7 @@ namespace NBus { void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session); void Remove(TBusSession* session); }; - + ///////////////////////////////////////////////////////////////// /// Factory methods to construct message queue TBusMessageQueuePtr CreateMessageQueue(const char* name = ""); diff --git a/library/cpp/monlib/service/service.h b/library/cpp/monlib/service/service.h index ccf09fefab..2f66dddaf8 100644 --- a/library/cpp/monlib/service/service.h +++ b/library/cpp/monlib/service/service.h @@ -5,7 +5,7 @@ #include <library/cpp/http/fetch/httpheader.h> #include <library/cpp/http/server/http.h> #include <library/cpp/logger/all.h> - + #include <util/network/ip.h> #include <library/cpp/cgiparam/cgiparam.h> diff --git a/library/cpp/packedtypes/packed.h b/library/cpp/packedtypes/packed.h index a7386b7828..88cff26ae2 100644 --- a/library/cpp/packedtypes/packed.h +++ b/library/cpp/packedtypes/packed.h @@ -5,8 +5,8 @@ #include <util/stream/output.h> #include <util/ysaveload.h> -#include "longs.h" - +#include "longs.h" + struct Stream_traits { template <typename T> static T get(IInputStream& in) { diff --git a/library/cpp/packedtypes/ya.make b/library/cpp/packedtypes/ya.make index 8af6d0eab6..4c2c950619 100644 --- a/library/cpp/packedtypes/ya.make +++ b/library/cpp/packedtypes/ya.make @@ -1,21 +1,21 @@ -LIBRARY() - +LIBRARY() + OWNER( akhropov velavokr ) -PEERDIR( +PEERDIR( library/cpp/streams/zc_memory_input -) - -SRCS( - fixed_point.h +) + +SRCS( + fixed_point.h longs.cpp - packed.h - packedfloat.cpp - packedfloat.h + packed.h + packedfloat.cpp + packedfloat.h zigzag.h -) - -END() +) + +END() diff --git a/library/cpp/sighandler/ya.make b/library/cpp/sighandler/ya.make index 3fd5244787..c0f7ea6084 100644 --- a/library/cpp/sighandler/ya.make +++ b/library/cpp/sighandler/ya.make @@ -1,10 +1,10 @@ -LIBRARY() - +LIBRARY() + OWNER(pg) - -SRCS( - async_signals_handler.cpp - async_signals_handler.h -) - -END() + +SRCS( + async_signals_handler.cpp + async_signals_handler.h +) + +END() diff --git a/library/cpp/string_utils/url/url.cpp b/library/cpp/string_utils/url/url.cpp index 988122e5d1..85f4ac5d69 100644 --- a/library/cpp/string_utils/url/url.cpp +++ b/library/cpp/string_utils/url/url.cpp @@ -312,28 +312,28 @@ TStringBuf CutMPrefix(const TStringBuf url) noexcept { static inline bool IsSchemeChar(char c) noexcept { return IsAsciiAlnum(c); //what about '+' ?.. -} - +} + static bool HasPrefix(const TStringBuf url) noexcept { TStringBuf scheme, unused; if (!url.TrySplit(TStringBuf("://"), scheme, unused)) - return false; - + return false; + return AllOf(scheme, IsSchemeChar); -} - +} + TString AddSchemePrefix(const TString& url) { return AddSchemePrefix(url, TStringBuf("http")); } TString AddSchemePrefix(const TString& url, TStringBuf scheme) { - if (HasPrefix(url)) { - return url; - } - + if (HasPrefix(url)) { + return url; + } + return TString::Join(scheme, TStringBuf("://"), url); -} - +} + #define X(c) (c >= 'A' ? ((c & 0xdf) - 'A') + 10 : (c - '0')) static inline int x2c(unsigned char* x) { diff --git a/library/cpp/string_utils/url/url.h b/library/cpp/string_utils/url/url.h index c5c550ea6b..84137ccc57 100644 --- a/library/cpp/string_utils/url/url.h +++ b/library/cpp/string_utils/url/url.h @@ -56,10 +56,10 @@ TWtringBuf CutHttpPrefix(const TWtringBuf url, bool ignorehttps = false) noexcep Y_PURE_FUNCTION TStringBuf CutSchemePrefix(const TStringBuf url) noexcept; -//! adds specified scheme prefix if URL has no scheme -//! @note if URL has scheme prefix already the function returns unchanged URL +//! adds specified scheme prefix if URL has no scheme +//! @note if URL has scheme prefix already the function returns unchanged URL TString AddSchemePrefix(const TString& url, const TStringBuf scheme); - + //! Same as `AddSchemePrefix(url, "http")`. TString AddSchemePrefix(const TString& url); diff --git a/library/cpp/string_utils/url/url_ut.cpp b/library/cpp/string_utils/url/url_ut.cpp index 02a0de9e5f..1588013893 100644 --- a/library/cpp/string_utils/url/url_ut.cpp +++ b/library/cpp/string_utils/url/url_ut.cpp @@ -69,14 +69,14 @@ Y_UNIT_TEST_SUITE(TUtilUrlTest) { UNIT_ASSERT_VALUES_EQUAL("FHFBN", GetZone("ya.FHFBN")); UNIT_ASSERT_VALUES_EQUAL("", GetZone("")); } - + Y_UNIT_TEST(TestAddSchemePrefix) { - UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("yandex.ru")); - UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("http://yandex.ru")); - UNIT_ASSERT_VALUES_EQUAL("https://yandex.ru", AddSchemePrefix("https://yandex.ru")); - UNIT_ASSERT_VALUES_EQUAL("file://yandex.ru", AddSchemePrefix("file://yandex.ru")); - UNIT_ASSERT_VALUES_EQUAL("ftp://ya.ru", AddSchemePrefix("ya.ru", "ftp")); - } + UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("yandex.ru")); + UNIT_ASSERT_VALUES_EQUAL("http://yandex.ru", AddSchemePrefix("http://yandex.ru")); + UNIT_ASSERT_VALUES_EQUAL("https://yandex.ru", AddSchemePrefix("https://yandex.ru")); + UNIT_ASSERT_VALUES_EQUAL("file://yandex.ru", AddSchemePrefix("file://yandex.ru")); + UNIT_ASSERT_VALUES_EQUAL("ftp://ya.ru", AddSchemePrefix("ya.ru", "ftp")); + } Y_UNIT_TEST(TestSchemeGet) { UNIT_ASSERT_VALUES_EQUAL("http://", GetSchemePrefix("http://ya.ru/bebe")); diff --git a/library/cpp/testing/unittest/utmain.cpp b/library/cpp/testing/unittest/utmain.cpp index a037e84c55..305bc6b40f 100644 --- a/library/cpp/testing/unittest/utmain.cpp +++ b/library/cpp/testing/unittest/utmain.cpp @@ -742,13 +742,13 @@ int NUnitTest::RunMain(int argc, char** argv) { processor.Enable(name); } } - } + } if (listTests != DONT_LIST) { return DoList(listTests == LIST_VERBOSE, *listStream); } TTestFactory::Instance().SetProcessor(&processor); - + unsigned ret; for (;;) { ret = TTestFactory::Instance().Execute(); diff --git a/library/cpp/threading/future/async.h b/library/cpp/threading/future/async.h index 0f3a741372..8543fdd5c6 100644 --- a/library/cpp/threading/future/async.h +++ b/library/cpp/threading/future/async.h @@ -20,10 +20,10 @@ namespace NThreading { template <typename Func> TFuture<TFutureType<TFunctionResult<Func>>> Async(Func&& func, IThreadPool& queue) { auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>(); - auto lambda = [promise, func = std::forward<Func>(func)]() mutable { + auto lambda = [promise, func = std::forward<Func>(func)]() mutable { NImpl::SetValue(promise, func); - }; - queue.SafeAddFunc(std::move(lambda)); + }; + queue.SafeAddFunc(std::move(lambda)); return promise.GetFuture(); } diff --git a/library/cpp/unicode/punycode/punycode_ut.cpp b/library/cpp/unicode/punycode/punycode_ut.cpp index 6724d828c8..97271cf0d8 100644 --- a/library/cpp/unicode/punycode/punycode_ut.cpp +++ b/library/cpp/unicode/punycode/punycode_ut.cpp @@ -3,19 +3,19 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/charset/wide.h> -namespace { - template<typename T1, typename T2> - inline bool HasSameBuffer(const T1& s1, const T2& s2) { - return s1.begin() == s2.begin(); - } -} - +namespace { + template<typename T1, typename T2> + inline bool HasSameBuffer(const T1& s1, const T2& s2) { + return s1.begin() == s2.begin(); + } +} + Y_UNIT_TEST_SUITE(TPunycodeTest) { static bool TestRaw(const TString& utf8, const TString& punycode) { TUtf16String unicode = UTF8ToWide(utf8); TString buf1; TUtf16String buf2; - return HasSameBuffer(WideToPunycode(unicode, buf1), buf1) && buf1 == punycode && HasSameBuffer(PunycodeToWide(punycode, buf2), buf2) && buf2 == unicode && WideToPunycode(unicode) == punycode && PunycodeToWide(punycode) == unicode; + return HasSameBuffer(WideToPunycode(unicode, buf1), buf1) && buf1 == punycode && HasSameBuffer(PunycodeToWide(punycode, buf2), buf2) && buf2 == unicode && WideToPunycode(unicode) == punycode && PunycodeToWide(punycode) == unicode; } Y_UNIT_TEST(RawEncodeDecode) { diff --git a/library/cpp/yson_pull/scalar.h b/library/cpp/yson_pull/scalar.h index e1ea5d41cb..509fce8b5e 100644 --- a/library/cpp/yson_pull/scalar.h +++ b/library/cpp/yson_pull/scalar.h @@ -26,7 +26,7 @@ namespace NYsonPull { size_t Size; }; - ui8 AsNothing[1]; + ui8 AsNothing[1]; bool AsBoolean; i64 AsInt64; ui64 AsUInt64; diff --git a/library/python/symbols/libc/ya.make b/library/python/symbols/libc/ya.make index e20d063505..7b84cbc961 100644 --- a/library/python/symbols/libc/ya.make +++ b/library/python/symbols/libc/ya.make @@ -6,7 +6,7 @@ PEERDIR( library/python/symbols/registry ) -IF (GCC OR CLANG) +IF (GCC OR CLANG) CFLAGS( -Wno-deprecated-declarations # For sem_getvalue. ) |