aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/poller_tcp_unit.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect/poller_tcp_unit.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/poller_tcp_unit.cpp')
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit.cpp192
1 files changed, 96 insertions, 96 deletions
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.cpp b/library/cpp/actors/interconnect/poller_tcp_unit.cpp
index 36180353b6..59e7dda810 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit.cpp
+++ b/library/cpp/actors/interconnect/poller_tcp_unit.cpp
@@ -1,7 +1,7 @@
#include "poller_tcp_unit.h"
#if !defined(_win_) && !defined(_darwin_)
-#include "poller_tcp_unit_epoll.h"
+#include "poller_tcp_unit_epoll.h"
#endif
#include "poller_tcp_unit_select.h"
@@ -11,116 +11,116 @@
#include <library/cpp/actors/util/intrinsics.h>
#if defined _linux_
-#include <pthread.h>
+#include <pthread.h>
#endif
namespace NInterconnect {
- TPollerUnit::TPtr
- TPollerUnit::Make(bool useSelect) {
+ TPollerUnit::TPtr
+ TPollerUnit::Make(bool useSelect) {
#if defined(_win_) || defined(_darwin_)
- Y_UNUSED(useSelect);
- return TPtr(new TPollerUnitSelect);
+ Y_UNUSED(useSelect);
+ return TPtr(new TPollerUnitSelect);
#else
- return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll);
+ return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll);
#endif
- }
-
- TPollerUnit::TPollerUnit()
- : StopFlag(true)
- , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read"))
- , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write"))
- {
- }
-
- TPollerUnit::~TPollerUnit() {
- if (!AtomicLoad(&StopFlag))
- Stop();
- }
-
- void
- TPollerUnit::Start() {
- AtomicStore(&StopFlag, false);
- ReadLoop.Start();
- WriteLoop.Start();
- }
-
- void
- TPollerUnit::Stop() {
- AtomicStore(&StopFlag, true);
- ReadLoop.Join();
- WriteLoop.Join();
- }
-
- template <>
- TPollerUnit::TSide&
- TPollerUnit::GetSide<false>() {
- return Read;
- }
-
- template <>
- TPollerUnit::TSide&
- TPollerUnit::GetSide<true>() {
- return Write;
- }
-
- void
- TPollerUnit::StartReadOperation(
+ }
+
+ TPollerUnit::TPollerUnit()
+ : StopFlag(true)
+ , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read"))
+ , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write"))
+ {
+ }
+
+ TPollerUnit::~TPollerUnit() {
+ if (!AtomicLoad(&StopFlag))
+ Stop();
+ }
+
+ void
+ TPollerUnit::Start() {
+ AtomicStore(&StopFlag, false);
+ ReadLoop.Start();
+ WriteLoop.Start();
+ }
+
+ void
+ TPollerUnit::Stop() {
+ AtomicStore(&StopFlag, true);
+ ReadLoop.Join();
+ WriteLoop.Join();
+ }
+
+ template <>
+ TPollerUnit::TSide&
+ TPollerUnit::GetSide<false>() {
+ return Read;
+ }
+
+ template <>
+ TPollerUnit::TSide&
+ TPollerUnit::GetSide<true>() {
+ return Write;
+ }
+
+ void
+ TPollerUnit::StartReadOperation(
const TIntrusivePtr<TSharedDescriptor>& stream,
- TFDDelegate&& operation) {
- Y_VERIFY_DEBUG(stream);
- if (AtomicLoad(&StopFlag))
- return;
- GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
- }
-
- void
- TPollerUnit::StartWriteOperation(
+ TFDDelegate&& operation) {
+ Y_VERIFY_DEBUG(stream);
+ if (AtomicLoad(&StopFlag))
+ return;
+ GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
+ }
+
+ void
+ TPollerUnit::StartWriteOperation(
const TIntrusivePtr<TSharedDescriptor>& stream,
- TFDDelegate&& operation) {
- Y_VERIFY_DEBUG(stream);
- if (AtomicLoad(&StopFlag))
- return;
- GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
- }
-
- template <bool IsWrite>
- void*
- TPollerUnit::IdleThread(void* param) {
+ TFDDelegate&& operation) {
+ Y_VERIFY_DEBUG(stream);
+ if (AtomicLoad(&StopFlag))
+ return;
+ GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
+ }
+
+ template <bool IsWrite>
+ void*
+ TPollerUnit::IdleThread(void* param) {
// TODO: musl-libc version of `sched_param` struct is for some reason different from pthread
// version in Ubuntu 12.04
#if defined(_linux_) && !defined(_musl_)
- pthread_t threadSelf = pthread_self();
- sched_param sparam = {20};
- pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam);
+ pthread_t threadSelf = pthread_self();
+ sched_param sparam = {20};
+ pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam);
#endif
- static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>();
- return nullptr;
- }
+ static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>();
+ return nullptr;
+ }
- template <>
- void
- TPollerUnit::RunLoop<false>() {
+ template <>
+ void
+ TPollerUnit::RunLoop<false>() {
NProfiling::TMemoryTagScope tag("INTERCONNECT_RECEIVED_DATA");
- while (!AtomicLoad(&StopFlag))
- ProcessRead();
- }
+ while (!AtomicLoad(&StopFlag))
+ ProcessRead();
+ }
- template <>
- void
- TPollerUnit::RunLoop<true>() {
+ template <>
+ void
+ TPollerUnit::RunLoop<true>() {
NProfiling::TMemoryTagScope tag("INTERCONNECT_SEND_DATA");
- while (!AtomicLoad(&StopFlag))
- ProcessWrite();
- }
-
- void
- TPollerUnit::TSide::ProcessInput() {
- if (!InputQueue.IsEmpty())
- do {
- auto sock = InputQueue.Top().first->GetDescriptor();
- if (!Operations.emplace(sock, std::move(InputQueue.Top())).second)
- Y_FAIL("Descriptor is already in pooler.");
- } while (InputQueue.Pop());
- }
+ while (!AtomicLoad(&StopFlag))
+ ProcessWrite();
+ }
+
+ void
+ TPollerUnit::TSide::ProcessInput() {
+ if (!InputQueue.IsEmpty())
+ do {
+ auto sock = InputQueue.Top().first->GetDescriptor();
+ if (!Operations.emplace(sock, std::move(InputQueue.Top())).second)
+ Y_FAIL("Descriptor is already in pooler.");
+ } while (InputQueue.Pop());
+ }
}