summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <[email protected]>2024-09-29 20:20:04 +0300
committerbabenko <[email protected]>2024-09-29 20:38:25 +0300
commita03aa200c67eac6745443ff0f9478435813a21af (patch)
tree9a2f2d0215441b283925536946a49c0ebf40d7be
parentcdfed3d3f13a1dc7b12a69216e15e04137c19c6a (diff)
Cosmetics and better logging in NNet connections
commit_hash:b9f0efe7c433ec6af3b87cf9f7e53eb681cb07e3
-rw-r--r--yt/yt/core/net/connection.cpp138
-rw-r--r--yt/yt/core/net/connection.h2
2 files changed, 71 insertions, 69 deletions
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index fa0c88d4d29..af458417ea8 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -6,6 +6,8 @@
#include <yt/yt/core/misc/finally.h>
+#include <yt/yt/core/actions/signal.h>
+
#include <yt/yt/core/net/socket.h>
#include <library/cpp/yt/system/handle_eintr.h>
@@ -37,7 +39,6 @@
namespace NYT::NNet {
using namespace NConcurrency;
-// using namespace NProfiling;
#ifdef _unix_
using TIOVecBasePtr = void*;
@@ -159,8 +160,8 @@ class TReadOperation
: public IIOOperation
{
public:
- explicit TReadOperation(const TSharedMutableRef& buffer)
- : Buffer_(buffer)
+ explicit TReadOperation(TSharedMutableRef buffer)
+ : Buffer_(std::move(buffer))
{ }
TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override
@@ -205,10 +206,10 @@ public:
}
private:
- TSharedMutableRef Buffer_;
- size_t Position_ = 0;
+ const TSharedMutableRef Buffer_;
+ const TPromise<size_t> ResultPromise_ = NewPromise<size_t>();
- TPromise<size_t> ResultPromise_ = NewPromise<size_t>();
+ size_t Position_ = 0;
};
////////////////////////////////////////////////////////////////////////////////
@@ -217,8 +218,8 @@ class TReceiveFromOperation
: public IIOOperation
{
public:
- explicit TReceiveFromOperation(const TSharedMutableRef& buffer)
- : Buffer_(buffer)
+ explicit TReceiveFromOperation(TSharedMutableRef buffer)
+ : Buffer_(std::move(buffer))
{ }
TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override
@@ -262,11 +263,11 @@ public:
}
private:
- TSharedMutableRef Buffer_;
+ const TSharedMutableRef Buffer_;
+ const TPromise<std::pair<size_t, TNetworkAddress>> ResultPromise_ = NewPromise<std::pair<size_t, TNetworkAddress>>();
+
size_t Position_ = 0;
TNetworkAddress RemoteAddress_;
-
- TPromise<std::pair<size_t, TNetworkAddress>> ResultPromise_ = NewPromise<std::pair<size_t, TNetworkAddress>>();
};
////////////////////////////////////////////////////////////////////////////////
@@ -275,8 +276,8 @@ class TWriteOperation
: public IIOOperation
{
public:
- explicit TWriteOperation(const TSharedRef& buffer)
- : Buffer_(buffer)
+ explicit TWriteOperation(TSharedRef buffer)
+ : Buffer_(std::move(buffer))
{ }
TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override
@@ -318,10 +319,10 @@ public:
}
private:
- TSharedRef Buffer_;
- size_t Position_ = 0;
+ const TSharedRef Buffer_;
+ const TPromise<void> ResultPromise_ = NewPromise<void>();
- TPromise<void> ResultPromise_ = NewPromise<void>();
+ size_t Position_ = 0;
};
////////////////////////////////////////////////////////////////////////////////
@@ -330,8 +331,8 @@ class TDeliveryFencedWriteOperation
: public TWriteOperation
{
public:
- TDeliveryFencedWriteOperation(const TSharedRef& buffer, TString pipePath)
- : TWriteOperation(buffer)
+ TDeliveryFencedWriteOperation(TSharedRef buffer, TString pipePath)
+ : TWriteOperation(std::move(buffer))
, PipePath_(std::move(pipePath))
{ }
@@ -366,8 +367,8 @@ class TWriteVOperation
: public IIOOperation
{
public:
- explicit TWriteVOperation(const TSharedRefArray& buffers)
- : Buffers_(buffers)
+ explicit TWriteVOperation(TSharedRefArray buffers)
+ : Buffers_(std::move(buffers))
{ }
TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override
@@ -427,11 +428,11 @@ public:
}
private:
- TSharedRefArray Buffers_;
+ const TSharedRefArray Buffers_;
+ const TPromise<void> ResultPromise_ = NewPromise<void>();
+
size_t Index_ = 0;
size_t Position_ = 0;
-
- TPromise<void> ResultPromise_ = NewPromise<void>();
};
////////////////////////////////////////////////////////////////////////////////
@@ -471,7 +472,7 @@ public:
private:
const bool ShutdownRead_;
- TPromise<void> ResultPromise_ = NewPromise<void>();
+ const TPromise<void> ResultPromise_ = NewPromise<void>();
};
////////////////////////////////////////////////////////////////////////////////
@@ -513,7 +514,7 @@ public:
DoIO(&ReadDirection_, Any(control & EPollControl::Read));
if (Any(control & EPollControl::ReadHup)) {
- NotifyPeerDisconnected();
+ OnPeerDisconnected();
}
}
@@ -526,12 +527,12 @@ public:
YT_VERIFY(!ReadDirection_.Running);
YT_VERIFY(!WriteDirection_.Running);
- auto shutdownError = TError("Connection is shut down");
+ auto error = AnnotateError(TError("Connection is shut down"));
if (WriteError_.IsOK()) {
- WriteError_ = shutdownError;
+ WriteError_ = error;
}
if (ReadError_.IsOK()) {
- ReadError_ = shutdownError;
+ ReadError_ = error;
}
ShutdownRequested_ = true;
@@ -557,7 +558,7 @@ public:
YT_VERIFY(TryClose(FD_, false));
FD_ = -1;
- NotifyPeerDisconnected();
+ OnPeerDisconnected();
ReadDirection_.OnShutdown();
WriteDirection_.OnShutdown();
@@ -628,8 +629,8 @@ public:
TFuture<void> Close()
{
- auto error = AnnotateError(TError("Connection closed"));
- return AbortIO(error);
+ YT_LOG_DEBUG("Closing connection");
+ return AbortIO(TError("Connection closed"));
}
bool IsIdle()
@@ -640,11 +641,13 @@ public:
WriteError_.IsOK() &&
!WriteDirection_.Operation &&
!ReadDirection_.Operation &&
- SynchronousIOCount_ == 0;
+ SynchronousIOCount_ == 0 &&
+ !PeerDisconnectedList_.IsFired();
}
TFuture<void> Abort(const TError& error)
{
+ YT_LOG_DEBUG(error, "Aborting connection");
return AbortIO(error);
}
@@ -736,23 +739,16 @@ public:
}
}
- void SubscribePeerDisconnect(TCallback<void()> cb)
+ void SubscribePeerDisconnect(TCallback<void()> callback)
{
- {
- auto guard = Guard(Lock_);
- if (!PeerDisconnected_) {
- OnPeerDisconnected_.push_back(std::move(cb));
- return;
- }
- }
-
- cb();
+ PeerDisconnectedList_.Subscribe(std::move(callback));
}
private:
const TConnectionId Id_ = TConnectionId::Create();
const TString Endpoint_;
const TString LoggingTag_;
+ const NLogging::TLogger Logger;
const TNetworkAddress LocalAddress_;
const TNetworkAddress RemoteAddress_;
TFileDescriptor FD_ = -1;
@@ -771,10 +767,11 @@ private:
TFDConnectionImpl(
TFileDescriptor fd,
TString filePath,
- const IPollerPtr& poller,
+ IPollerPtr poller,
bool useDeliveryFence)
: Endpoint_(Format("File{%v}", filePath))
- , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_))
+ , LoggingTag_(MakeLoggingTag(Id_, Endpoint_))
+ , Logger(NetLogger().WithRawTag(LoggingTag_))
, FD_(fd)
, Poller_(std::move(poller))
, UseDeliveryFence_(useDeliveryFence)
@@ -787,13 +784,19 @@ private:
const TNetworkAddress& remoteAddress,
IPollerPtr poller)
: Endpoint_(Format("FD{%v<->%v}", localAddress, remoteAddress))
- , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_))
+ , LoggingTag_(MakeLoggingTag(Id_, Endpoint_))
+ , Logger(NetLogger().WithRawTag(LoggingTag_))
, LocalAddress_(localAddress)
, RemoteAddress_(remoteAddress)
, FD_(fd)
, Poller_(std::move(poller))
{ }
+ ~TFDConnectionImpl()
+ {
+ YT_LOG_DEBUG("Connection destroyed");
+ }
+
DECLARE_NEW_FRIEND()
class TSynchronousIOGuard
@@ -891,8 +894,7 @@ private:
TError ReadError_;
const TPromise<void> ShutdownPromise_ = NewPromise<void>();
- bool PeerDisconnected_ = false;
- std::vector<TCallback<void()>> OnPeerDisconnected_;
+ TSingleShotCallbackList<void()> PeerDisconnectedList_;
TClosure AbortFromReadTimeout_;
TClosure AbortFromWriteTimeout_;
@@ -900,6 +902,13 @@ private:
TDelayedExecutorCookie ReadTimeoutCookie_;
TDelayedExecutorCookie WriteTimeoutCookie_;
+ static TString MakeLoggingTag(TConnectionId id, const TString& endpoint)
+ {
+ return Format("ConnectionId: %v, Endpoint: %v",
+ id,
+ endpoint);
+ }
+
TError AnnotateError(const TError& error) const
{
return error
@@ -925,12 +934,13 @@ private:
void Init()
{
+ YT_LOG_DEBUG("Connection created");
+
AbortFromReadTimeout_ = BIND(&TFDConnectionImpl::AbortFromReadTimeout, MakeWeak(this));
AbortFromWriteTimeout_ = BIND(&TFDConnectionImpl::AbortFromWriteTimeout, MakeWeak(this));
if (!Poller_->TryRegister(this)) {
- WriteError_ = TError("Cannot register connection pollable");
- ReadError_ = WriteError_;
+ ReadError_ = WriteError_ = AnnotateError(TError("Cannot register connection pollable"));
return;
}
@@ -948,15 +958,13 @@ private:
switch (direction) {
case EDirection::Read:
return ReadError_;
- case EDirection::Write: {
+ case EDirection::Write:
// We want to read if there were write errors before, but we don't want to write if there were read errors,
// because it looks useless.
- auto error = WriteError_;
- if (error.IsOK() && !ReadError_.IsOK()) {
- error = ReadError_;
+ if (!WriteError_.IsOK()) {
+ return WriteError_;
}
- return error;
- }
+ return ReadError_;
}
}
@@ -969,7 +977,6 @@ private:
auto guard = Guard(Lock_);
error = GetCurrentError(direction->Direction);
-
if (error.IsOK()) {
if (direction->Operation) {
THROW_ERROR(AnnotateError(TError("Another IO operation is in progress")));
@@ -1096,11 +1103,12 @@ private:
auto guard = Guard(Lock_);
// In case of read errors we have called Unarm and Unregister already.
bool needUnarmAndUnregister = ReadError_.IsOK();
+ auto annotatedError = AnnotateError(error);
if (WriteError_.IsOK()) {
- WriteError_ = error;
+ WriteError_ = annotatedError;
}
if (ReadError_.IsOK()) {
- ReadError_ = error;
+ ReadError_ = annotatedError;
}
if (needUnarmAndUnregister) {
Poller_->Unarm(FD_, this);
@@ -1120,16 +1128,10 @@ private:
YT_UNUSED_FUTURE(Abort(TError("Write timeout")));
}
- void NotifyPeerDisconnected()
+ void OnPeerDisconnected()
{
- std::vector<TCallback<void()>> callbacks;
- {
- auto guard = Guard(Lock_);
- PeerDisconnected_ = true;
- callbacks = std::move(OnPeerDisconnected_);
- }
- for (const auto& cb : callbacks) {
- cb();
+ if (PeerDisconnectedList_.Fire()) {
+ YT_LOG_DEBUG("Peer disconnected");
}
}
};
@@ -1273,7 +1275,7 @@ public:
private:
const TFDConnectionImplPtr Impl_;
- TRefCountedPtr PipeHolder_;
+ const TRefCountedPtr PipeHolder_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h
index 894b7482220..4078e114609 100644
--- a/yt/yt/core/net/connection.h
+++ b/yt/yt/core/net/connection.h
@@ -83,7 +83,7 @@ struct IConnection
TFuture<void> Abort() override = 0;
// SubscribePeerDisconnect is best effort and is not guaranteed to fire.
- virtual void SubscribePeerDisconnect(TCallback<void()> cb) = 0;
+ virtual void SubscribePeerDisconnect(TCallback<void()> callback) = 0;
};
DEFINE_REFCOUNTED_TYPE(IConnection)