aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-05-07 18:53:41 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-05-07 19:04:48 +0300
commit4170e3df16f6ca921e9004cf9d98be5484c36cef (patch)
tree1ed43222c62bfb899cb85a35c310eaf4a934309d
parent50ea5223965d9f51e893d37c4247bce108a6e9db (diff)
downloadydb-4170e3df16f6ca921e9004cf9d98be5484c36cef.tar.gz
Bunch of issues from babenko-issues tag
4a6fd6fb52fcb4d43f76651d645dc2e1affe3dd0
-rw-r--r--library/cpp/yt/misc/port.h4
-rw-r--r--yt/yt/core/bus/tcp/connection.cpp29
-rw-r--r--yt/yt/core/bus/tcp/connection.h2
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_pool.cpp4
-rw-r--r--yt/yt/core/net/connection.cpp124
-rw-r--r--yt/yt/core/net/connection.h25
-rw-r--r--yt/yt/core/net/packet_connection.h2
-rw-r--r--yt/yt/library/process/pipe.cpp10
-rw-r--r--yt/yt/library/process/pipe.h3
-rw-r--r--yt/yt/library/process/unittests/pipes_ut.cpp2
10 files changed, 89 insertions, 116 deletions
diff --git a/library/cpp/yt/misc/port.h b/library/cpp/yt/misc/port.h
index e30e163a59..fe1c5e96de 100644
--- a/library/cpp/yt/misc/port.h
+++ b/library/cpp/yt/misc/port.h
@@ -71,7 +71,7 @@
#endif
#if defined(_unix_)
- #define NO_UNIQUE_ADDRESS [[no_unique_address]]
+ #define YT_ATTRIBUTE_NO_UNIQUE_ADDRESS [[no_unique_address]]
#else
- #define NO_UNIQUE_ADDRESS
+ #define YT_ATTRIBUTE_NO_UNIQUE_ADDRESS
#endif
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp
index 8e29b6f056..f0acaaca92 100644
--- a/yt/yt/core/bus/tcp/connection.cpp
+++ b/yt/yt/core/bus/tcp/connection.cpp
@@ -196,7 +196,10 @@ void TTcpConnection::Close()
EncodedFragments_.clear();
- FlushStatistics(/*guarded*/ false);
+ {
+ auto guard = Guard(Lock_);
+ FlushStatistics();
+ }
}
void TTcpConnection::Start()
@@ -257,7 +260,10 @@ void TTcpConnection::RunPeriodicCheck()
return;
}
- FlushStatistics(/*guarded*/ false);
+ {
+ auto guard = Guard(Lock_);
+ FlushStatistics();
+ }
auto now = NProfiling::GetCpuInstant();
@@ -423,7 +429,7 @@ void TTcpConnection::Open(TGuard<NThreading::TSpinLock>& guard)
}
UpdateConnectionCount(+1);
- FlushStatistics(/*guarded*/ true);
+ FlushStatistics();
// Go online and start event processing.
auto previousPendingControl = static_cast<EPollControl>(PendingControl_.fetch_and(~static_cast<ui64>(EPollControl::Offline)));
@@ -1787,20 +1793,11 @@ void TTcpConnection::InitSocketTosLevel(TTosLevel tosLevel)
}
}
-void TTcpConnection::FlushStatistics(bool guarded)
+void TTcpConnection::FlushStatistics()
{
- auto doFlush = [&] {
- UpdateTcpStatistics();
- FlushBusStatistics();
- };
-
- if (guarded) {
- doFlush();
- return;
- }
-
- auto guard = Guard(Lock_);
- doFlush();
+ VERIFY_SPINLOCK_AFFINITY(Lock_);
+ UpdateTcpStatistics();
+ FlushBusStatistics();
}
template <class T, class U>
diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h
index 5cab9e591b..81e2c4a3a0 100644
--- a/yt/yt/core/bus/tcp/connection.h
+++ b/yt/yt/core/bus/tcp/connection.h
@@ -359,7 +359,7 @@ private:
void IncrementPendingOut(i64 packetSize);
void DecrementPendingOut(i64 packetSize);
- void FlushStatistics(bool guarded = false);
+ void FlushStatistics();
template <class T, class U>
i64 UpdateBusCounter(T TBusNetworkBandCounters::* field, U delta);
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
index 46d4845f40..372b8f4977 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
@@ -469,7 +469,7 @@ private:
i64 DequeuedActionCount_ = 0;
i64 ExecutedActionCount_ = 0;
- NO_UNIQUE_ADDRESS THandle ProfilerHandle_;
+ YT_ATTRIBUTE_NO_UNIQUE_ADDRESS THandle ProfilerHandle_;
TDuration GetTotalTimeEstimate(TInstant now) const
{
@@ -496,7 +496,7 @@ private:
IFairShareCallbackQueuePtr Queue_;
- NO_UNIQUE_ADDRESS TPoolProfilerObject Profiler_;
+ YT_ATTRIBUTE_NO_UNIQUE_ADDRESS TPoolProfilerObject Profiler_;
class TCpuTimeAccounter
{
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index 29d18fef40..a88cbd2a13 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -330,9 +330,9 @@ class TDeliveryFencedWriteOperation
: public TWriteOperation
{
public:
- TDeliveryFencedWriteOperation(const TSharedRef& buffer, const TString& pipePath)
+ TDeliveryFencedWriteOperation(const TSharedRef& buffer, TString pipePath)
: TWriteOperation(buffer)
- , PipePath_(pipePath)
+ , PipePath_(std::move(pipePath))
{ }
TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override
@@ -352,7 +352,7 @@ public:
}
private:
- TString PipePath_;
+ const TString PipePath_;
bool IsWriteComplete(const TErrorOr<TIOResult>& result)
{
@@ -482,11 +482,11 @@ class TFDConnectionImpl
public:
static TFDConnectionImplPtr Create(
TFileDescriptor fd,
- const TString& filePath,
- const IPollerPtr& poller,
+ TString filePath,
+ IPollerPtr poller,
bool useDeliveryFence)
{
- auto impl = New<TFDConnectionImpl>(fd, filePath, poller, useDeliveryFence);
+ auto impl = New<TFDConnectionImpl>(fd, std::move(filePath), std::move(poller), useDeliveryFence);
impl->Init();
return impl;
}
@@ -495,9 +495,9 @@ public:
TFileDescriptor fd,
const TNetworkAddress& localAddress,
const TNetworkAddress& remoteAddress,
- const IPollerPtr& poller)
+ IPollerPtr poller)
{
- auto impl = New<TFDConnectionImpl>(fd, localAddress, remoteAddress, poller);
+ auto impl = New<TFDConnectionImpl>(fd, localAddress, remoteAddress, std::move(poller));
impl->Init();
return impl;
}
@@ -625,22 +625,6 @@ public:
return DoWrite(data);
}
- TFuture<void> DoWrite(const TSharedRef& data)
- {
- auto write = std::make_unique<TWriteOperation>(data);
- auto future = write->ToFuture();
- StartIO(&WriteDirection_, std::move(write));
- return future;
- }
-
- TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data)
- {
- auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_);
- auto future = syncWrite->ToFuture();
- StartIO(&WriteDirection_, std::move(syncWrite));
- return future;
- }
-
TFuture<void> WriteV(const TSharedRefArray& data)
{
auto writeV = std::make_unique<TWriteVOperation>(data);
@@ -783,32 +767,32 @@ private:
// which future is set only
// after data from pipe has been read.
const bool UseDeliveryFence_ = false;
- const TString PipePath_ = {};
+ const TString PipePath_;
TFDConnectionImpl(
TFileDescriptor fd,
- const TString& filePath,
+ TString filePath,
const IPollerPtr& poller,
bool useDeliveryFence)
: Name_(Format("File{%v}", filePath))
, FD_(fd)
- , Poller_(poller)
+ , Poller_(std::move(poller))
, UseDeliveryFence_(useDeliveryFence)
- , PipePath_(filePath)
+ , PipePath_(std::move(filePath))
{ }
TFDConnectionImpl(
TFileDescriptor fd,
const TNetworkAddress& localAddress,
const TNetworkAddress& remoteAddress,
- const IPollerPtr& poller)
+ IPollerPtr poller)
: Name_(Format("FD{%v<->%v}", localAddress, remoteAddress))
, LoggingTag_(Format("ConnectionId: %v", Name_))
, LocalAddress_(localAddress)
, RemoteAddress_(remoteAddress)
, FD_(fd)
- , Poller_(poller)
+ , Poller_(std::move(poller))
{ }
DECLARE_NEW_FRIEND()
@@ -917,6 +901,22 @@ private:
TDelayedExecutorCookie ReadTimeoutCookie_;
TDelayedExecutorCookie WriteTimeoutCookie_;
+ TFuture<void> DoWrite(const TSharedRef& data)
+ {
+ auto write = std::make_unique<TWriteOperation>(data);
+ auto future = write->ToFuture();
+ StartIO(&WriteDirection_, std::move(write));
+ return future;
+ }
+
+ TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data)
+ {
+ auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_);
+ auto future = syncWrite->ToFuture();
+ StartIO(&WriteDirection_, std::move(syncWrite));
+ return future;
+ }
+
void Init()
{
AbortFromReadTimeout_ = BIND(&TFDConnectionImpl::AbortFromReadTimeout, MakeWeak(this));
@@ -1127,11 +1127,11 @@ class TFDConnection
public:
TFDConnection(
TFileDescriptor fd,
- const TString& pipePath,
- const IPollerPtr& poller,
+ TString pipePath,
+ IPollerPtr poller,
TRefCountedPtr pipeHolder = nullptr,
bool useDeliveryFence = false)
- : Impl_(TFDConnectionImpl::Create(fd, pipePath, poller, useDeliveryFence))
+ : Impl_(TFDConnectionImpl::Create(fd, std::move(pipePath), std::move(poller), useDeliveryFence))
, PipeHolder_(std::move(pipeHolder))
{ }
@@ -1139,8 +1139,8 @@ public:
TFileDescriptor fd,
const TNetworkAddress& localAddress,
const TNetworkAddress& remoteAddress,
- const IPollerPtr& poller)
- : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, poller))
+ IPollerPtr poller)
+ : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller)))
{ }
~TFDConnection()
@@ -1290,7 +1290,7 @@ TFileDescriptor CreateWriteFDForConnection(
////////////////////////////////////////////////////////////////////////////////
-std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const IPollerPtr& poller)
+std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(IPollerPtr poller)
{
SOCKET fds[2];
@@ -1320,7 +1320,7 @@ std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const IPollerPtr&
auto address1 = GetSocketName(fds[1]);
auto first = New<TFDConnection>(fds[0], address0, address1, poller);
- auto second = New<TFDConnection>(fds[1], address1, address0, poller);
+ auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller));
return std::pair(std::move(first), std::move(second));
} catch (...) {
YT_VERIFY(TryClose(fds[0], false));
@@ -1333,23 +1333,23 @@ IConnectionPtr CreateConnectionFromFD(
TFileDescriptor fd,
const TNetworkAddress& localAddress,
const TNetworkAddress& remoteAddress,
- const IPollerPtr& poller)
+ IPollerPtr poller)
{
- return New<TFDConnection>(fd, localAddress, remoteAddress, poller);
+ return New<TFDConnection>(fd, localAddress, remoteAddress, std::move(poller));
}
IConnectionReaderPtr CreateInputConnectionFromFD(
TFileDescriptor fd,
- const TString& pipePath,
- const IPollerPtr& poller,
+ TString pipePath,
+ IPollerPtr poller,
const TRefCountedPtr& pipeHolder)
{
- return New<TFDConnection>(fd, pipePath, poller, pipeHolder);
+ return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder);
}
IConnectionReaderPtr CreateInputConnectionFromPath(
- const TString& pipePath,
- const IPollerPtr& poller,
+ TString pipePath,
+ IPollerPtr poller,
const TRefCountedPtr& pipeHolder)
{
#ifdef _unix_
@@ -1361,37 +1361,25 @@ IConnectionReaderPtr CreateInputConnectionFromPath(
<< TErrorAttribute("path", pipePath);
}
- return New<TFDConnection>(fd, pipePath, poller, pipeHolder);
+ return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder);
#else
THROW_ERROR_EXCEPTION("Unsupported platform");
#endif
}
IConnectionWriterPtr CreateOutputConnectionFromPath(
- const TString& pipePath,
- const IPollerPtr& poller,
+ TString pipePath,
+ IPollerPtr poller,
const TRefCountedPtr& pipeHolder,
- std::optional<int> capacity)
-{
- return New<TFDConnection>(
- CreateWriteFDForConnection(pipePath, capacity),
- pipePath,
- poller,
- pipeHolder);
-}
-
-IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath(
- const TString& pipePath,
- const NConcurrency::IPollerPtr& poller,
- const TRefCountedPtr& pipeHolder,
- std::optional<int> capacity)
+ std::optional<int> capacity,
+ bool useDeliveryFence)
{
return New<TFDConnection>(
CreateWriteFDForConnection(pipePath, capacity),
- pipePath,
- poller,
+ std::move(pipePath),
+ std::move(poller),
pipeHolder,
- /*useDeliveryFence*/ true);
+ useDeliveryFence);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1403,8 +1391,8 @@ public:
TPacketConnection(
TFileDescriptor fd,
const TNetworkAddress& localAddress,
- const IPollerPtr& poller)
- : Impl_(TFDConnectionImpl::Create(fd, localAddress, TNetworkAddress{}, poller))
+ IPollerPtr poller)
+ : Impl_(TFDConnectionImpl::Create(fd, localAddress, TNetworkAddress{}, std::move(poller)))
{ }
~TPacketConnection()
@@ -1434,7 +1422,7 @@ private:
IPacketConnectionPtr CreatePacketConnection(
const TNetworkAddress& at,
- const NConcurrency::IPollerPtr& poller)
+ NConcurrency::IPollerPtr poller)
{
auto fd = CreateUdpSocket();
try {
@@ -1445,7 +1433,7 @@ IPacketConnectionPtr CreatePacketConnection(
throw;
}
- return New<TPacketConnection>(fd, at, poller);
+ return New<TPacketConnection>(fd, at, std::move(poller));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h
index 610481689f..24b28872d5 100644
--- a/yt/yt/core/net/connection.h
+++ b/yt/yt/core/net/connection.h
@@ -88,37 +88,32 @@ DEFINE_REFCOUNTED_TYPE(IConnection)
////////////////////////////////////////////////////////////////////////////////
-std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(const NConcurrency::IPollerPtr& poller);
+std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(NConcurrency::IPollerPtr poller);
//! File descriptor must be in nonblocking mode.
IConnectionPtr CreateConnectionFromFD(
TFileDescriptor fd,
const TNetworkAddress& localAddress,
const TNetworkAddress& remoteAddress,
- const NConcurrency::IPollerPtr& poller);
+ NConcurrency::IPollerPtr poller);
IConnectionReaderPtr CreateInputConnectionFromFD(
TFileDescriptor fd,
- const TString& pipePath,
- const NConcurrency::IPollerPtr& poller,
+ TString pipePath,
+ NConcurrency::IPollerPtr poller,
const TRefCountedPtr& pipeHolder);
IConnectionReaderPtr CreateInputConnectionFromPath(
- const TString& pipePath,
- const NConcurrency::IPollerPtr& poller,
+ TString pipePath,
+ NConcurrency::IPollerPtr poller,
const TRefCountedPtr& pipeHolder);
IConnectionWriterPtr CreateOutputConnectionFromPath(
- const TString& pipePath,
- const NConcurrency::IPollerPtr& poller,
+ TString pipePath,
+ NConcurrency::IPollerPtr poller,
const TRefCountedPtr& pipeHolder,
- std::optional<int> capacity = {});
-
-IConnectionWriterPtr CreateDeliveryFencedOutputConnectionFromPath(
- const TString& pipePath,
- const NConcurrency::IPollerPtr& poller,
- const TRefCountedPtr& pipeHolder,
- std::optional<int> capacity = {});
+ std::optional<int> capacity = {},
+ bool useDeliveryFence = false);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/packet_connection.h b/yt/yt/core/net/packet_connection.h
index a4450df1e3..d6de44d5aa 100644
--- a/yt/yt/core/net/packet_connection.h
+++ b/yt/yt/core/net/packet_connection.h
@@ -27,7 +27,7 @@ DEFINE_REFCOUNTED_TYPE(IPacketConnection)
IPacketConnectionPtr CreatePacketConnection(
const TNetworkAddress& at,
- const NConcurrency::IPollerPtr& poller);
+ NConcurrency::IPollerPtr poller);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/process/pipe.cpp b/yt/yt/library/process/pipe.cpp
index a2efaac44b..c35d82b2c9 100644
--- a/yt/yt/library/process/pipe.cpp
+++ b/yt/yt/library/process/pipe.cpp
@@ -64,16 +64,10 @@ IConnectionReaderPtr TNamedPipe::CreateAsyncReader()
return CreateInputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this));
}
-IConnectionWriterPtr TNamedPipe::CreateAsyncWriter()
+IConnectionWriterPtr TNamedPipe::CreateAsyncWriter(bool useDeliveryFence)
{
YT_VERIFY(!Path_.empty());
- return CreateOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_);
-}
-
-IConnectionWriterPtr TNamedPipe::CreateDeliveryFencedAsyncWriter()
-{
- YT_VERIFY(!Path_.empty());
- return CreateDeliveryFencedOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_);
+ return CreateOutputConnectionFromPath(Path_, TIODispatcher::Get()->GetPoller(), MakeStrong(this), Capacity_, useDeliveryFence);
}
TString TNamedPipe::GetPath() const
diff --git a/yt/yt/library/process/pipe.h b/yt/yt/library/process/pipe.h
index 7e0b518fc7..1a827ac977 100644
--- a/yt/yt/library/process/pipe.h
+++ b/yt/yt/library/process/pipe.h
@@ -19,8 +19,7 @@ public:
static TNamedPipePtr FromPath(const TString& path);
NNet::IConnectionReaderPtr CreateAsyncReader();
- NNet::IConnectionWriterPtr CreateAsyncWriter();
- NNet::IConnectionWriterPtr CreateDeliveryFencedAsyncWriter();
+ NNet::IConnectionWriterPtr CreateAsyncWriter(bool useDeliveryFence = false);
TString GetPath() const;
diff --git a/yt/yt/library/process/unittests/pipes_ut.cpp b/yt/yt/library/process/unittests/pipes_ut.cpp
index af851958a0..b41b7fc595 100644
--- a/yt/yt/library/process/unittests/pipes_ut.cpp
+++ b/yt/yt/library/process/unittests/pipes_ut.cpp
@@ -180,7 +180,7 @@ protected:
{
auto pipe = TNamedPipe::Create("./namedpipewcap", 0660);
Reader = pipe->CreateAsyncReader();
- Writer = pipe->CreateDeliveryFencedAsyncWriter();
+ Writer = pipe->CreateAsyncWriter(/*useDeliveryFence*/ true);
}
IConnectionReaderPtr Reader;