diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-05-07 18:53:41 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-05-07 19:04:48 +0300 |
commit | 4170e3df16f6ca921e9004cf9d98be5484c36cef (patch) | |
tree | 1ed43222c62bfb899cb85a35c310eaf4a934309d | |
parent | 50ea5223965d9f51e893d37c4247bce108a6e9db (diff) | |
download | ydb-4170e3df16f6ca921e9004cf9d98be5484c36cef.tar.gz |
Bunch of issues from babenko-issues tag
4a6fd6fb52fcb4d43f76651d645dc2e1affe3dd0
-rw-r--r-- | library/cpp/yt/misc/port.h | 4 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/connection.cpp | 29 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/connection.h | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fair_share_invoker_pool.cpp | 4 | ||||
-rw-r--r-- | yt/yt/core/net/connection.cpp | 124 | ||||
-rw-r--r-- | yt/yt/core/net/connection.h | 25 | ||||
-rw-r--r-- | yt/yt/core/net/packet_connection.h | 2 | ||||
-rw-r--r-- | yt/yt/library/process/pipe.cpp | 10 | ||||
-rw-r--r-- | yt/yt/library/process/pipe.h | 3 | ||||
-rw-r--r-- | yt/yt/library/process/unittests/pipes_ut.cpp | 2 |
10 files changed, 89 insertions, 116 deletions
diff --git a/library/cpp/yt/misc/port.h b/library/cpp/yt/misc/port.h index e30e163a59..fe1c5e96de 100644 --- a/library/cpp/yt/misc/port.h +++ b/library/cpp/yt/misc/port.h @@ -71,7 +71,7 @@ #endif #if defined(_unix_) - #define NO_UNIQUE_ADDRESS [[no_unique_address]] + #define YT_ATTRIBUTE_NO_UNIQUE_ADDRESS [[no_unique_address]] #else - #define NO_UNIQUE_ADDRESS + #define YT_ATTRIBUTE_NO_UNIQUE_ADDRESS #endif diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index 8e29b6f056..f0acaaca92 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -196,7 +196,10 @@ void TTcpConnection::Close() EncodedFragments_.clear(); - FlushStatistics(/*guarded*/ false); + { + auto guard = Guard(Lock_); + FlushStatistics(); + } } void TTcpConnection::Start() @@ -257,7 +260,10 @@ void TTcpConnection::RunPeriodicCheck() return; } - FlushStatistics(/*guarded*/ false); + { + auto guard = Guard(Lock_); + FlushStatistics(); + } auto now = NProfiling::GetCpuInstant(); @@ -423,7 +429,7 @@ void TTcpConnection::Open(TGuard<NThreading::TSpinLock>& guard) } UpdateConnectionCount(+1); - FlushStatistics(/*guarded*/ true); + FlushStatistics(); // Go online and start event processing. auto previousPendingControl = static_cast<EPollControl>(PendingControl_.fetch_and(~static_cast<ui64>(EPollControl::Offline))); @@ -1787,20 +1793,11 @@ void TTcpConnection::InitSocketTosLevel(TTosLevel tosLevel) } } -void TTcpConnection::FlushStatistics(bool guarded) +void TTcpConnection::FlushStatistics() { - auto doFlush = [&] { - UpdateTcpStatistics(); - FlushBusStatistics(); - }; - - if (guarded) { - doFlush(); - return; - } - - auto guard = Guard(Lock_); - doFlush(); + VERIFY_SPINLOCK_AFFINITY(Lock_); + UpdateTcpStatistics(); + FlushBusStatistics(); } template <class T, class U> diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index 5cab9e591b..81e2c4a3a0 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -359,7 +359,7 @@ private: void IncrementPendingOut(i64 packetSize); void DecrementPendingOut(i64 packetSize); - void FlushStatistics(bool guarded = false); + void FlushStatistics(); template <class T, class U> i64 UpdateBusCounter(T TBusNetworkBandCounters::* field, U delta); diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp index 46d4845f40..372b8f4977 100644 --- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp @@ -469,7 +469,7 @@ private: i64 DequeuedActionCount_ = 0; i64 ExecutedActionCount_ = 0; - NO_UNIQUE_ADDRESS THandle ProfilerHandle_; + YT_ATTRIBUTE_NO_UNIQUE_ADDRESS THandle ProfilerHandle_; TDuration GetTotalTimeEstimate(TInstant now) const { @@ -496,7 +496,7 @@ private: IFairShareCallbackQueuePtr Queue_; - NO_UNIQUE_ADDRESS TPoolProfilerObject Profiler_; + YT_ATTRIBUTE_NO_UNIQUE_ADDRESS TPoolProfilerObject Profiler_; class TCpuTimeAccounter { diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index 29d18fef40..a88cbd2a13 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -330,9 +330,9 @@ class TDeliveryFencedWriteOperation : public TWriteOperation { public: - TDeliveryFencedWriteOperation(const TSharedRef& buffer, const TString& pipePath) + TDeliveryFencedWriteOperation(const TSharedRef& buffer, TString pipePath) : TWriteOperation(buffer) - , PipePath_(pipePath) + , PipePath_(std::move(pipePath)) { } TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override @@ -352,7 +352,7 @@ public: } private: - TString PipePath_; + const TString PipePath_; bool IsWriteComplete(const TErrorOr<TIOResult>& result) { @@ -482,11 +482,11 @@ class TFDConnectionImpl public: static TFDConnectionImplPtr Create( TFileDescriptor fd, - const TString& filePath, - const IPollerPtr& poller, + TString filePath, + IPollerPtr poller, bool useDeliveryFence) { - auto impl = New<TFDConnectionImpl>(fd, filePath, poller, useDeliveryFence); + auto impl = New<TFDConnectionImpl>(fd, std::move(filePath), std::move(poller), useDeliveryFence); impl->Init(); return impl; } @@ -495,9 +495,9 @@ public: TFileDescriptor fd, const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, - const IPollerPtr& poller) + IPollerPtr poller) { - auto impl = New<TFDConnectionImpl>(fd, localAddress, remoteAddress, poller); + auto impl = New<TFDConnectionImpl>(fd, localAddress, remoteAddress, std::move(poller)); impl->Init(); return impl; } @@ -625,22 +625,6 @@ public: return DoWrite(data); } - TFuture<void> DoWrite(const TSharedRef& data) - { - auto write = std::make_unique<TWriteOperation>(data); - auto future = write->ToFuture(); - StartIO(&WriteDirection_, std::move(write)); - return future; - } - - TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data) - { - auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_); - auto future = syncWrite->ToFuture(); - StartIO(&WriteDirection_, std::move(syncWrite)); - return future; - } - TFuture<void> WriteV(const TSharedRefArray& data) { auto writeV = std::make_unique<TWriteVOperation>(data); @@ -783,32 +767,32 @@ private: // which future is set only // after data from pipe has been read. const bool UseDeliveryFence_ = false; - const TString PipePath_ = {}; + const TString PipePath_; TFDConnectionImpl( TFileDescriptor fd, - const TString& filePath, + TString filePath, const IPollerPtr& poller, bool useDeliveryFence) : Name_(Format("File{%v}", filePath)) , FD_(fd) - , Poller_(poller) + , Poller_(std::move(poller)) , UseDeliveryFence_(useDeliveryFence) - , PipePath_(filePath) + , PipePath_(std::move(filePath)) { } TFDConnectionImpl( TFileDescriptor fd, const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, - const IPollerPtr& poller) + IPollerPtr poller) : Name_(Format("FD{%v<->%v}", localAddress, remoteAddress)) , LoggingTag_(Format("ConnectionId: %v", Name_)) , LocalAddress_(localAddress) , RemoteAddress_(remoteAddress) , FD_(fd) - , Poller_(poller) + , Poller_(std::move(poller)) { } DECLARE_NEW_FRIEND() @@ -917,6 +901,22 @@ private: TDelayedExecutorCookie ReadTimeoutCookie_; TDelayedExecutorCookie WriteTimeoutCookie_; + TFuture<void> DoWrite(const TSharedRef& data) + { + auto write = std::make_unique<TWriteOperation>(data); + auto future = write->ToFuture(); + StartIO(&WriteDirection_, std::move(write)); + return future; + } + + TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data) + { + auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_); + auto future = syncWrite->ToFuture(); + StartIO(&WriteDirection_, std::move(syncWrite)); + return future; + } + void Init() { AbortFromReadTimeout_ = BIND(&TFDConnectionImpl::AbortFromReadTimeout, MakeWeak(this)); @@ -1127,11 +1127,11 @@ class TFDConnection public: TFDConnection( TFileDescriptor fd, - const TString& pipePath, - const IPollerPtr& poller, + TString pipePath, + IPollerPtr poller, TRefCountedPtr pipeHolder = nullptr, bool useDeliveryFence = false) - : Impl_(TFDConnectionImpl::Create(fd, pipePath, poller, useDeliveryFence)) + : Impl_(TFDConnectionImpl::Create(fd, std::move(pipePath), std::move(poller), useDeliveryFence)) , PipeHolder_(std::move(pipeHolder)) { } @@ -1139,8 +1139,8 @@ public: TFileDescriptor fd, const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, - const IPollerPtr& poller) - : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, poller)) + IPollerPtr poller) + : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller))) { } ~TFDConnection() @@ -1290,7 +1290,7 @@ TFileDescriptor CreateWriteFDForConnection( //////////////////////////////////////////////////////////////////////////////// -std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const IPollerPtr& poller) +std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(IPollerPtr poller) { SOCKET fds[2]; @@ -1320,7 +1320,7 @@ std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const IPollerPtr& auto address1 = GetSocketName(fds[1]); auto first = New<TFDConnection>(fds[0], address0, address1, poller); - auto second = New<TFDConnection>(fds[1], address1, address0, poller); + auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller)); return std::pair(std::move(first), std::move(second)); } catch (...) { YT_VERIFY(TryClose(fds[0], false)); @@ -1333,23 +1333,23 @@ IConnectionPtr CreateConnectionFromFD( TFileDescriptor fd, const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, - const IPollerPtr& poller) + IPollerPtr poller) { - return New<TFDConnection>(fd, localAddress, remoteAddress, poller); + return New<TFDConnection>(fd, localAddress, remoteAddress, std::move(poller)); } IConnectionReaderPtr CreateInputConnectionFromFD( TFileDescriptor fd, - const TString& pipePath, - const IPollerPtr& poller, + TString pipePath, + IPollerPtr poller, const TRefCountedPtr& pipeHolder) { - return New<TFDConnection>(fd, pipePath, poller, pipeHolder); + return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder); } IConnectionReaderPtr CreateInputConnectionFromPath( - const TString& pipePath, - const IPollerPtr& poller, + TString pipePath, + IPollerPtr poller, const TRefCountedPtr& pipeHolder) { #ifdef _unix_ @@ -1361,37 +1361,25 @@ IConnectionReaderPtr CreateInputConnectionFromPath( << TErrorAttribute("path", pipePath); } - return New<TFDConnection>(fd, pipePath, poller, pipeHolder); + return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder); #else THROW_ERROR_EXCEPTION("Unsupported platform"); #endif } IConnectionWriterPtr CreateOutputConnectionFromPath( - const TString& pipePath, - const IPollerPtr& poller, + TString pipePath, + IPollerPtr poller, const TRefCountedPtr& pipeHolder, - std::optional<int> capacity) -{ - return New<TFDConnection>( - CreateWriteFDForConnection(pipePath, capacity), - pipePath, - poller, - pipeHolder); -} - -IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath( - const TString& pipePath, - const NConcurrency::IPollerPtr& poller, - const TRefCountedPtr& pipeHolder, - std::optional<int> capacity) + std::optional<int> capacity, + bool useDeliveryFence) { return New<TFDConnection>( CreateWriteFDForConnection(pipePath, capacity), - pipePath, - poller, + std::move(pipePath), + std::move(poller), pipeHolder, - /*useDeliveryFence*/ true); + useDeliveryFence); } //////////////////////////////////////////////////////////////////////////////// @@ -1403,8 +1391,8 @@ public: TPacketConnection( TFileDescriptor fd, const TNetworkAddress& localAddress, - const IPollerPtr& poller) - : Impl_(TFDConnectionImpl::Create(fd, localAddress, TNetworkAddress{}, poller)) + IPollerPtr poller) + : Impl_(TFDConnectionImpl::Create(fd, localAddress, TNetworkAddress{}, std::move(poller))) { } ~TPacketConnection() @@ -1434,7 +1422,7 @@ private: IPacketConnectionPtr CreatePacketConnection( const TNetworkAddress& at, - const NConcurrency::IPollerPtr& poller) + NConcurrency::IPollerPtr poller) { auto fd = CreateUdpSocket(); try { @@ -1445,7 +1433,7 @@ IPacketConnectionPtr CreatePacketConnection( throw; } - return New<TPacketConnection>(fd, at, poller); + return New<TPacketConnection>(fd, at, std::move(poller)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index 610481689f..24b28872d5 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -88,37 +88,32 @@ DEFINE_REFCOUNTED_TYPE(IConnection) //////////////////////////////////////////////////////////////////////////////// -std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const NConcurrency::IPollerPtr& poller); +std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(NConcurrency::IPollerPtr poller); //! File descriptor must be in nonblocking mode. IConnectionPtr CreateConnectionFromFD( TFileDescriptor fd, const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, - const NConcurrency::IPollerPtr& poller); + NConcurrency::IPollerPtr poller); IConnectionReaderPtr CreateInputConnectionFromFD( TFileDescriptor fd, - const TString& pipePath, - const NConcurrency::IPollerPtr& poller, + TString pipePath, + NConcurrency::IPollerPtr poller, const TRefCountedPtr& pipeHolder); IConnectionReaderPtr CreateInputConnectionFromPath( - const TString& pipePath, - const NConcurrency::IPollerPtr& poller, + TString pipePath, + NConcurrency::IPollerPtr poller, const TRefCountedPtr& pipeHolder); IConnectionWriterPtr CreateOutputConnectionFromPath( - const TString& pipePath, - const NConcurrency::IPollerPtr& poller, + TString pipePath, + NConcurrency::IPollerPtr poller, const TRefCountedPtr& pipeHolder, - std::optional<int> capacity = {}); - -IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath( - const TString& pipePath, - const NConcurrency::IPollerPtr& poller, - const TRefCountedPtr& pipeHolder, - std::optional<int> capacity = {}); + std::optional<int> capacity = {}, + bool useDeliveryFence = false); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/packet_connection.h b/yt/yt/core/net/packet_connection.h index a4450df1e3..d6de44d5aa 100644 --- a/yt/yt/core/net/packet_connection.h +++ b/yt/yt/core/net/packet_connection.h @@ -27,7 +27,7 @@ DEFINE_REFCOUNTED_TYPE(IPacketConnection) IPacketConnectionPtr CreatePacketConnection( const TNetworkAddress& at, - const NConcurrency::IPollerPtr& poller); + NConcurrency::IPollerPtr poller); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/process/pipe.cpp b/yt/yt/library/process/pipe.cpp index a2efaac44b..c35d82b2c9 100644 --- a/yt/yt/library/process/pipe.cpp +++ b/yt/yt/library/process/pipe.cpp @@ -64,16 +64,10 @@ IConnectionReaderPtr TNamedPipe::CreateAsyncReader() return CreateInputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this)); } -IConnectionWriterPtr TNamedPipe::CreateAsyncWriter() +IConnectionWriterPtr TNamedPipe::CreateAsyncWriter(bool useDeliveryFence) { YT_VERIFY(!Path_.empty()); - return CreateOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_); -} - -IConnectionWriterPtr TNamedPipe::CreateDeliveryFencedAsyncWriter() -{ - YT_VERIFY(!Path_.empty()); - return CreateDeliveryFencedOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_); + return CreateOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_, useDeliveryFence); } TString TNamedPipe::GetPath() const diff --git a/yt/yt/library/process/pipe.h b/yt/yt/library/process/pipe.h index 7e0b518fc7..1a827ac977 100644 --- a/yt/yt/library/process/pipe.h +++ b/yt/yt/library/process/pipe.h @@ -19,8 +19,7 @@ public: static TNamedPipePtr FromPath(const TString& path); NNet::IConnectionReaderPtr CreateAsyncReader(); - NNet::IConnectionWriterPtr CreateAsyncWriter(); - NNet::IConnectionWriterPtr CreateDeliveryFencedAsyncWriter(); + NNet::IConnectionWriterPtr CreateAsyncWriter(bool useDeliveryFence = false); TString GetPath() const; diff --git a/yt/yt/library/process/unittests/pipes_ut.cpp b/yt/yt/library/process/unittests/pipes_ut.cpp index af851958a0..b41b7fc595 100644 --- a/yt/yt/library/process/unittests/pipes_ut.cpp +++ b/yt/yt/library/process/unittests/pipes_ut.cpp @@ -180,7 +180,7 @@ protected: { auto pipe = TNamedPipe::Create("./namedpipewcap", 0660); Reader = pipe->CreateAsyncReader(); - Writer = pipe->CreateDeliveryFencedAsyncWriter(); + Writer = pipe->CreateAsyncWriter(/*useDeliveryFence*/ true); } IConnectionReaderPtr Reader; |