diff options
| author | babenko <[email protected]> | 2024-09-29 20:20:04 +0300 |
|---|---|---|
| committer | babenko <[email protected]> | 2024-09-29 20:38:25 +0300 |
| commit | a03aa200c67eac6745443ff0f9478435813a21af (patch) | |
| tree | 9a2f2d0215441b283925536946a49c0ebf40d7be | |
| parent | cdfed3d3f13a1dc7b12a69216e15e04137c19c6a (diff) | |
Cosmetics and better logging in NNet connections
commit_hash:b9f0efe7c433ec6af3b87cf9f7e53eb681cb07e3
| -rw-r--r-- | yt/yt/core/net/connection.cpp | 138 | ||||
| -rw-r--r-- | yt/yt/core/net/connection.h | 2 |
2 files changed, 71 insertions, 69 deletions
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index fa0c88d4d29..af458417ea8 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -6,6 +6,8 @@ #include <yt/yt/core/misc/finally.h> +#include <yt/yt/core/actions/signal.h> + #include <yt/yt/core/net/socket.h> #include <library/cpp/yt/system/handle_eintr.h> @@ -37,7 +39,6 @@ namespace NYT::NNet { using namespace NConcurrency; -// using namespace NProfiling; #ifdef _unix_ using TIOVecBasePtr = void*; @@ -159,8 +160,8 @@ class TReadOperation : public IIOOperation { public: - explicit TReadOperation(const TSharedMutableRef& buffer) - : Buffer_(buffer) + explicit TReadOperation(TSharedMutableRef buffer) + : Buffer_(std::move(buffer)) { } TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override @@ -205,10 +206,10 @@ public: } private: - TSharedMutableRef Buffer_; - size_t Position_ = 0; + const TSharedMutableRef Buffer_; + const TPromise<size_t> ResultPromise_ = NewPromise<size_t>(); - TPromise<size_t> ResultPromise_ = NewPromise<size_t>(); + size_t Position_ = 0; }; //////////////////////////////////////////////////////////////////////////////// @@ -217,8 +218,8 @@ class TReceiveFromOperation : public IIOOperation { public: - explicit TReceiveFromOperation(const TSharedMutableRef& buffer) - : Buffer_(buffer) + explicit TReceiveFromOperation(TSharedMutableRef buffer) + : Buffer_(std::move(buffer)) { } TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override @@ -262,11 +263,11 @@ public: } private: - TSharedMutableRef Buffer_; + const TSharedMutableRef Buffer_; + const TPromise<std::pair<size_t, TNetworkAddress>> ResultPromise_ = NewPromise<std::pair<size_t, TNetworkAddress>>(); + size_t Position_ = 0; TNetworkAddress RemoteAddress_; - - TPromise<std::pair<size_t, TNetworkAddress>> ResultPromise_ = NewPromise<std::pair<size_t, TNetworkAddress>>(); }; //////////////////////////////////////////////////////////////////////////////// @@ -275,8 +276,8 @@ class TWriteOperation : public IIOOperation { public: - explicit TWriteOperation(const TSharedRef& buffer) - : Buffer_(buffer) + explicit TWriteOperation(TSharedRef buffer) + : Buffer_(std::move(buffer)) { } TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override @@ -318,10 +319,10 @@ public: } private: - TSharedRef Buffer_; - size_t Position_ = 0; + const TSharedRef Buffer_; + const TPromise<void> ResultPromise_ = NewPromise<void>(); - TPromise<void> ResultPromise_ = NewPromise<void>(); + size_t Position_ = 0; }; //////////////////////////////////////////////////////////////////////////////// @@ -330,8 +331,8 @@ class TDeliveryFencedWriteOperation : public TWriteOperation { public: - TDeliveryFencedWriteOperation(const TSharedRef& buffer, TString pipePath) - : TWriteOperation(buffer) + TDeliveryFencedWriteOperation(TSharedRef buffer, TString pipePath) + : TWriteOperation(std::move(buffer)) , PipePath_(std::move(pipePath)) { } @@ -366,8 +367,8 @@ class TWriteVOperation : public IIOOperation { public: - explicit TWriteVOperation(const TSharedRefArray& buffers) - : Buffers_(buffers) + explicit TWriteVOperation(TSharedRefArray buffers) + : Buffers_(std::move(buffers)) { } TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override @@ -427,11 +428,11 @@ public: } private: - TSharedRefArray Buffers_; + const TSharedRefArray Buffers_; + const TPromise<void> ResultPromise_ = NewPromise<void>(); + size_t Index_ = 0; size_t Position_ = 0; - - TPromise<void> ResultPromise_ = NewPromise<void>(); }; //////////////////////////////////////////////////////////////////////////////// @@ -471,7 +472,7 @@ public: private: const bool ShutdownRead_; - TPromise<void> ResultPromise_ = NewPromise<void>(); + const TPromise<void> ResultPromise_ = NewPromise<void>(); }; //////////////////////////////////////////////////////////////////////////////// @@ -513,7 +514,7 @@ public: DoIO(&ReadDirection_, Any(control & EPollControl::Read)); if (Any(control & EPollControl::ReadHup)) { - NotifyPeerDisconnected(); + OnPeerDisconnected(); } } @@ -526,12 +527,12 @@ public: YT_VERIFY(!ReadDirection_.Running); YT_VERIFY(!WriteDirection_.Running); - auto shutdownError = TError("Connection is shut down"); + auto error = AnnotateError(TError("Connection is shut down")); if (WriteError_.IsOK()) { - WriteError_ = shutdownError; + WriteError_ = error; } if (ReadError_.IsOK()) { - ReadError_ = shutdownError; + ReadError_ = error; } ShutdownRequested_ = true; @@ -557,7 +558,7 @@ public: YT_VERIFY(TryClose(FD_, false)); FD_ = -1; - NotifyPeerDisconnected(); + OnPeerDisconnected(); ReadDirection_.OnShutdown(); WriteDirection_.OnShutdown(); @@ -628,8 +629,8 @@ public: TFuture<void> Close() { - auto error = AnnotateError(TError("Connection closed")); - return AbortIO(error); + YT_LOG_DEBUG("Closing connection"); + return AbortIO(TError("Connection closed")); } bool IsIdle() @@ -640,11 +641,13 @@ public: WriteError_.IsOK() && !WriteDirection_.Operation && !ReadDirection_.Operation && - SynchronousIOCount_ == 0; + SynchronousIOCount_ == 0 && + !PeerDisconnectedList_.IsFired(); } TFuture<void> Abort(const TError& error) { + YT_LOG_DEBUG(error, "Aborting connection"); return AbortIO(error); } @@ -736,23 +739,16 @@ public: } } - void SubscribePeerDisconnect(TCallback<void()> cb) + void SubscribePeerDisconnect(TCallback<void()> callback) { - { - auto guard = Guard(Lock_); - if (!PeerDisconnected_) { - OnPeerDisconnected_.push_back(std::move(cb)); - return; - } - } - - cb(); + PeerDisconnectedList_.Subscribe(std::move(callback)); } private: const TConnectionId Id_ = TConnectionId::Create(); const TString Endpoint_; const TString LoggingTag_; + const NLogging::TLogger Logger; const TNetworkAddress LocalAddress_; const TNetworkAddress RemoteAddress_; TFileDescriptor FD_ = -1; @@ -771,10 +767,11 @@ private: TFDConnectionImpl( TFileDescriptor fd, TString filePath, - const IPollerPtr& poller, + IPollerPtr poller, bool useDeliveryFence) : Endpoint_(Format("File{%v}", filePath)) - , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_)) + , LoggingTag_(MakeLoggingTag(Id_, Endpoint_)) + , Logger(NetLogger().WithRawTag(LoggingTag_)) , FD_(fd) , Poller_(std::move(poller)) , UseDeliveryFence_(useDeliveryFence) @@ -787,13 +784,19 @@ private: const TNetworkAddress& remoteAddress, IPollerPtr poller) : Endpoint_(Format("FD{%v<->%v}", localAddress, remoteAddress)) - , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_)) + , LoggingTag_(MakeLoggingTag(Id_, Endpoint_)) + , Logger(NetLogger().WithRawTag(LoggingTag_)) , LocalAddress_(localAddress) , RemoteAddress_(remoteAddress) , FD_(fd) , Poller_(std::move(poller)) { } + ~TFDConnectionImpl() + { + YT_LOG_DEBUG("Connection destroyed"); + } + DECLARE_NEW_FRIEND() class TSynchronousIOGuard @@ -891,8 +894,7 @@ private: TError ReadError_; const TPromise<void> ShutdownPromise_ = NewPromise<void>(); - bool PeerDisconnected_ = false; - std::vector<TCallback<void()>> OnPeerDisconnected_; + TSingleShotCallbackList<void()> PeerDisconnectedList_; TClosure AbortFromReadTimeout_; TClosure AbortFromWriteTimeout_; @@ -900,6 +902,13 @@ private: TDelayedExecutorCookie ReadTimeoutCookie_; TDelayedExecutorCookie WriteTimeoutCookie_; + static TString MakeLoggingTag(TConnectionId id, const TString& endpoint) + { + return Format("ConnectionId: %v, Endpoint: %v", + id, + endpoint); + } + TError AnnotateError(const TError& error) const { return error @@ -925,12 +934,13 @@ private: void Init() { + YT_LOG_DEBUG("Connection created"); + AbortFromReadTimeout_ = BIND(&TFDConnectionImpl::AbortFromReadTimeout, MakeWeak(this)); AbortFromWriteTimeout_ = BIND(&TFDConnectionImpl::AbortFromWriteTimeout, MakeWeak(this)); if (!Poller_->TryRegister(this)) { - WriteError_ = TError("Cannot register connection pollable"); - ReadError_ = WriteError_; + ReadError_ = WriteError_ = AnnotateError(TError("Cannot register connection pollable")); return; } @@ -948,15 +958,13 @@ private: switch (direction) { case EDirection::Read: return ReadError_; - case EDirection::Write: { + case EDirection::Write: // We want to read if there were write errors before, but we don't want to write if there were read errors, // because it looks useless. - auto error = WriteError_; - if (error.IsOK() && !ReadError_.IsOK()) { - error = ReadError_; + if (!WriteError_.IsOK()) { + return WriteError_; } - return error; - } + return ReadError_; } } @@ -969,7 +977,6 @@ private: auto guard = Guard(Lock_); error = GetCurrentError(direction->Direction); - if (error.IsOK()) { if (direction->Operation) { THROW_ERROR(AnnotateError(TError("Another IO operation is in progress"))); @@ -1096,11 +1103,12 @@ private: auto guard = Guard(Lock_); // In case of read errors we have called Unarm and Unregister already. bool needUnarmAndUnregister = ReadError_.IsOK(); + auto annotatedError = AnnotateError(error); if (WriteError_.IsOK()) { - WriteError_ = error; + WriteError_ = annotatedError; } if (ReadError_.IsOK()) { - ReadError_ = error; + ReadError_ = annotatedError; } if (needUnarmAndUnregister) { Poller_->Unarm(FD_, this); @@ -1120,16 +1128,10 @@ private: YT_UNUSED_FUTURE(Abort(TError("Write timeout"))); } - void NotifyPeerDisconnected() + void OnPeerDisconnected() { - std::vector<TCallback<void()>> callbacks; - { - auto guard = Guard(Lock_); - PeerDisconnected_ = true; - callbacks = std::move(OnPeerDisconnected_); - } - for (const auto& cb : callbacks) { - cb(); + if (PeerDisconnectedList_.Fire()) { + YT_LOG_DEBUG("Peer disconnected"); } } }; @@ -1273,7 +1275,7 @@ public: private: const TFDConnectionImplPtr Impl_; - TRefCountedPtr PipeHolder_; + const TRefCountedPtr PipeHolder_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index 894b7482220..4078e114609 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -83,7 +83,7 @@ struct IConnection TFuture<void> Abort() override = 0; // SubscribePeerDisconnect is best effort and is not guaranteed to fire. - virtual void SubscribePeerDisconnect(TCallback<void()> cb) = 0; + virtual void SubscribePeerDisconnect(TCallback<void()> callback) = 0; }; DEFINE_REFCOUNTED_TYPE(IConnection) |
