diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect/poller_tcp_unit.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/poller_tcp_unit.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/poller_tcp_unit.cpp | 192 |
1 files changed, 96 insertions, 96 deletions
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.cpp b/library/cpp/actors/interconnect/poller_tcp_unit.cpp index 36180353b6..59e7dda810 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit.cpp +++ b/library/cpp/actors/interconnect/poller_tcp_unit.cpp @@ -1,7 +1,7 @@ #include "poller_tcp_unit.h" #if !defined(_win_) && !defined(_darwin_) -#include "poller_tcp_unit_epoll.h" +#include "poller_tcp_unit_epoll.h" #endif #include "poller_tcp_unit_select.h" @@ -11,116 +11,116 @@ #include <library/cpp/actors/util/intrinsics.h> #if defined _linux_ -#include <pthread.h> +#include <pthread.h> #endif namespace NInterconnect { - TPollerUnit::TPtr - TPollerUnit::Make(bool useSelect) { + TPollerUnit::TPtr + TPollerUnit::Make(bool useSelect) { #if defined(_win_) || defined(_darwin_) - Y_UNUSED(useSelect); - return TPtr(new TPollerUnitSelect); + Y_UNUSED(useSelect); + return TPtr(new TPollerUnitSelect); #else - return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll); + return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll); #endif - } - - TPollerUnit::TPollerUnit() - : StopFlag(true) - , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read")) - , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write")) - { - } - - TPollerUnit::~TPollerUnit() { - if (!AtomicLoad(&StopFlag)) - Stop(); - } - - void - TPollerUnit::Start() { - AtomicStore(&StopFlag, false); - ReadLoop.Start(); - WriteLoop.Start(); - } - - void - TPollerUnit::Stop() { - AtomicStore(&StopFlag, true); - ReadLoop.Join(); - WriteLoop.Join(); - } - - template <> - TPollerUnit::TSide& - TPollerUnit::GetSide<false>() { - return Read; - } - - template <> - TPollerUnit::TSide& - TPollerUnit::GetSide<true>() { - return Write; - } - - void - TPollerUnit::StartReadOperation( + } + + TPollerUnit::TPollerUnit() + : StopFlag(true) + , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read")) + , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write")) + { + } + + TPollerUnit::~TPollerUnit() { + if (!AtomicLoad(&StopFlag)) + Stop(); + } + + void + TPollerUnit::Start() { + AtomicStore(&StopFlag, false); + ReadLoop.Start(); + WriteLoop.Start(); + } + + void + TPollerUnit::Stop() { + AtomicStore(&StopFlag, true); + ReadLoop.Join(); + WriteLoop.Join(); + } + + template <> + TPollerUnit::TSide& + TPollerUnit::GetSide<false>() { + return Read; + } + + template <> + TPollerUnit::TSide& + TPollerUnit::GetSide<true>() { + return Write; + } + + void + TPollerUnit::StartReadOperation( const TIntrusivePtr<TSharedDescriptor>& stream, - TFDDelegate&& operation) { - Y_VERIFY_DEBUG(stream); - if (AtomicLoad(&StopFlag)) - return; - GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); - } - - void - TPollerUnit::StartWriteOperation( + TFDDelegate&& operation) { + Y_VERIFY_DEBUG(stream); + if (AtomicLoad(&StopFlag)) + return; + GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); + } + + void + TPollerUnit::StartWriteOperation( const TIntrusivePtr<TSharedDescriptor>& stream, - TFDDelegate&& operation) { - Y_VERIFY_DEBUG(stream); - if (AtomicLoad(&StopFlag)) - return; - GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); - } - - template <bool IsWrite> - void* - TPollerUnit::IdleThread(void* param) { + TFDDelegate&& operation) { + Y_VERIFY_DEBUG(stream); + if (AtomicLoad(&StopFlag)) + return; + GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); + } + + template <bool IsWrite> + void* + TPollerUnit::IdleThread(void* param) { // TODO: musl-libc version of `sched_param` struct is for some reason different from pthread // version in Ubuntu 12.04 #if defined(_linux_) && !defined(_musl_) - pthread_t threadSelf = pthread_self(); - sched_param sparam = {20}; - pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam); + pthread_t threadSelf = pthread_self(); + sched_param sparam = {20}; + pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam); #endif - static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>(); - return nullptr; - } + static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>(); + return nullptr; + } - template <> - void - TPollerUnit::RunLoop<false>() { + template <> + void + TPollerUnit::RunLoop<false>() { NProfiling::TMemoryTagScope tag("INTERCONNECT_RECEIVED_DATA"); - while (!AtomicLoad(&StopFlag)) - ProcessRead(); - } + while (!AtomicLoad(&StopFlag)) + ProcessRead(); + } - template <> - void - TPollerUnit::RunLoop<true>() { + template <> + void + TPollerUnit::RunLoop<true>() { NProfiling::TMemoryTagScope tag("INTERCONNECT_SEND_DATA"); - while (!AtomicLoad(&StopFlag)) - ProcessWrite(); - } - - void - TPollerUnit::TSide::ProcessInput() { - if (!InputQueue.IsEmpty()) - do { - auto sock = InputQueue.Top().first->GetDescriptor(); - if (!Operations.emplace(sock, std::move(InputQueue.Top())).second) - Y_FAIL("Descriptor is already in pooler."); - } while (InputQueue.Pop()); - } + while (!AtomicLoad(&StopFlag)) + ProcessWrite(); + } + + void + TPollerUnit::TSide::ProcessInput() { + if (!InputQueue.IsEmpty()) + do { + auto sock = InputQueue.Top().first->GetDescriptor(); + if (!Operations.emplace(sock, std::move(InputQueue.Top())).second) + Y_FAIL("Descriptor is already in pooler."); + } while (InputQueue.Pop()); + } } |