diff options
author | danilalexeev <danilalexeev@yandex-team.com> | 2024-03-14 11:26:38 +0300 |
---|---|---|
committer | danilalexeev <danilalexeev@yandex-team.com> | 2024-03-14 11:45:36 +0300 |
commit | 43f3e79465b37117220ad19202e4de5971d21cdb (patch) | |
tree | 16c669b8067306041bb12e4b2282692e6c408757 | |
parent | 678e296ea969fe485638b36b6a20122d6df2aa96 (diff) | |
download | ydb-43f3e79465b37117220ad19202e4de5971d21cdb.tar.gz |
Fix peer discovery in case of read-only leader downtime
e22067ea1e62a3d9c773a428251d189ab1d86168
-rw-r--r-- | yt/yt/core/rpc/dynamic_channel_pool.cpp | 58 | ||||
-rw-r--r-- | yt/yt/core/rpc/helpers.cpp | 23 | ||||
-rw-r--r-- | yt/yt/core/rpc/helpers.h | 3 | ||||
-rw-r--r-- | yt/yt/core/rpc/peer_discovery.cpp | 16 | ||||
-rw-r--r-- | yt/yt/core/rpc/peer_discovery.h | 13 | ||||
-rw-r--r-- | yt/yt/core/rpc/public.h | 4 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 4 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 8 | ||||
-rw-r--r-- | yt/yt_proto/yt/core/rpc/proto/rpc.proto | 2 |
9 files changed, 106 insertions, 25 deletions
diff --git a/yt/yt/core/rpc/dynamic_channel_pool.cpp b/yt/yt/core/rpc/dynamic_channel_pool.cpp index 5ce042662d..cc404497a3 100644 --- a/yt/yt/core/rpc/dynamic_channel_pool.cpp +++ b/yt/yt/core/rpc/dynamic_channel_pool.cpp @@ -203,6 +203,7 @@ private: TDelayedExecutorCookie RediscoveryCookie_; TError TerminationError_; TError PeerDiscoveryError_; + TError LastGlobalDiscoveryError_; THashSet<TString> ActiveAddresses_; THashSet<TString> BannedAddresses_; @@ -262,7 +263,7 @@ private: THashSet<TString> RequestingAddresses_; constexpr static int MaxDiscoveryErrorsToKeep = 100; - std::deque<TError> DiscoveryErrors_; + std::deque<TError> PeerDiscoveryErrors_; void DoRun() { @@ -357,6 +358,10 @@ private: BanPeer(address, error, owner->Config_->SoftBackoffTime); InvalidatePeer(address); } + } else if (rspOrError.GetCode() == NRpc::EErrorCode::GlobalDiscoveryError) { + YT_LOG_DEBUG(rspOrError, "Peer discovery session failed (Address: %v)", + address); + OnFinished(rspOrError); } else { YT_LOG_DEBUG(rspOrError, "Peer discovery request failed (Address: %v)", address); @@ -403,19 +408,19 @@ private: auto guard = Guard(SpinLock_); YT_VERIFY(RequestedAddresses_.erase(address) == 1); - DiscoveryErrors_.push_back(error); - while (std::ssize(DiscoveryErrors_) > MaxDiscoveryErrorsToKeep) { - DiscoveryErrors_.pop_front(); + PeerDiscoveryErrors_.push_back(error); + while (std::ssize(PeerDiscoveryErrors_) > MaxDiscoveryErrorsToKeep) { + PeerDiscoveryErrors_.pop_front(); } } owner->BanPeer(address, backoffTime); } - std::vector<TError> GetDiscoveryErrors() + std::vector<TError> GetPeerDiscoveryErrors() { auto guard = Guard(SpinLock_); - return {DiscoveryErrors_.begin(), DiscoveryErrors_.end()}; + return {PeerDiscoveryErrors_.begin(), PeerDiscoveryErrors_.end()}; } void AddViablePeer(const TString& address) @@ -438,7 +443,7 @@ private: owner->InvalidatePeer(address); } - void OnFinished() + void OnFinished(const TError& globalDiscoveryError = {}) { auto owner = Owner_.Lock(); if (!owner) { @@ -453,7 +458,10 @@ private: FinishedPromise_.Set(); } else { auto error = owner->MakeNoAlivePeersError() - << GetDiscoveryErrors(); + << GetPeerDiscoveryErrors(); + if (!globalDiscoveryError.IsOK()) { + error <<= globalDiscoveryError; + } YT_LOG_DEBUG(error, "Error performing peer discovery"); owner->ViablePeerRegistry_->SetError(error); FinishedPromise_.Set(error); @@ -567,6 +575,9 @@ private: YT_LOG_DEBUG("Peer is down"); } } else { + if (rspOrError.GetCode() == NRpc::EErrorCode::GlobalDiscoveryError) { + owner->SetLastGlobalDiscoveryError(rspOrError); + } YT_LOG_DEBUG(rspOrError, "Failed to poll peer"); } @@ -618,6 +629,12 @@ private: return session; } + void SetLastGlobalDiscoveryError(const TError& error) + { + auto guard = WriterGuard(SpinLock_); + LastGlobalDiscoveryError_ = error; + } + TError MakeNoAlivePeersError() { auto guard = ReaderGuard(SpinLock_); @@ -657,11 +674,13 @@ private: session->Run(); } - void OnDiscoverySessionFinished(const TError& /*error*/) + void OnDiscoverySessionFinished(const TError& globalDiscoveryError) { NTracing::TNullTraceContextGuard nullTraceContext; auto guard = WriterGuard(SpinLock_); + LastGlobalDiscoveryError_ = globalDiscoveryError; + YT_VERIFY(CurrentDiscoverySession_); CurrentDiscoverySession_.Reset(); @@ -829,6 +848,17 @@ private: ViablePeerRegistry_->UnregisterPeer(address); } + TError MaybeTransformChannelError(TError error) + { + auto guard = ReaderGuard(SpinLock_); + + if (!LastGlobalDiscoveryError_.IsOK()) { + return LastGlobalDiscoveryError_ + << std::move(error); + } + return error; + } + void OnChannelFailed( const TString& address, const IChannelPtr& channel, @@ -854,7 +884,15 @@ private: return CreateFailureDetectingChannel( ChannelFactory_->CreateChannel(address), Config_->AcknowledgementTimeout, - BIND(&TImpl::OnChannelFailed, MakeWeak(this), address)); + BIND(&TImpl::OnChannelFailed, MakeWeak(this), address), + BIND(&IsChannelFailureError), + BIND([this_ = MakeWeak(this)] (TError error) { + if (auto strongThis = this_.Lock()) { + return strongThis->MaybeTransformChannelError(std::move(error)); + } else { + return error; + } + })); } }; diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp index 27cc2c7af4..49d61147e6 100644 --- a/yt/yt/core/rpc/helpers.cpp +++ b/yt/yt/core/rpc/helpers.cpp @@ -302,11 +302,13 @@ public: IChannelPtr underlyingChannel, std::optional<TDuration> acknowledgementTimeout, TCallback<void(const IChannelPtr&, const TError&)> onFailure, - TCallback<bool(const TError&)> isError) + TCallback<bool(const TError&)> isError, + TCallback<TError(TError)> maybeTransformError) : TChannelWrapper(std::move(underlyingChannel)) , AcknowledgementTimeout_(acknowledgementTimeout) , OnFailure_(std::move(onFailure)) , IsError_(std::move(isError)) + , MaybeTransformError_(std::move(maybeTransformError)) , OnTerminated_(BIND(&TFailureDetectingChannel::OnTerminated, MakeWeak(this))) { UnderlyingChannel_->SubscribeTerminated(OnTerminated_); @@ -328,7 +330,7 @@ public: } return UnderlyingChannel_->Send( request, - New<TResponseHandler>(this, std::move(responseHandler), OnFailure_, IsError_), + New<TResponseHandler>(this, std::move(responseHandler), OnFailure_, IsError_, MaybeTransformError_), updatedOptions); } @@ -336,6 +338,7 @@ private: const std::optional<TDuration> AcknowledgementTimeout_; const TCallback<void(const IChannelPtr&, const TError&)> OnFailure_; const TCallback<bool(const TError&)> IsError_; + const TCallback<TError(TError)> MaybeTransformError_; const TCallback<void(const TError&)> OnTerminated_; @@ -352,11 +355,13 @@ private: IChannelPtr channel, IClientResponseHandlerPtr underlyingHandler, TCallback<void(const IChannelPtr&, const TError&)> onFailure, - TCallback<bool(const TError&)> isError) + TCallback<bool(const TError&)> isError, + TCallback<TError(TError)> maybeTransformError) : Channel_(std::move(channel)) , UnderlyingHandler_(std::move(underlyingHandler)) , OnFailure_(std::move(onFailure)) , IsError_(std::move(isError)) + , MaybeTransformError_(std::move(maybeTransformError)) { } void HandleAcknowledgement() override @@ -374,6 +379,11 @@ private: if (IsError_(error)) { OnFailure_.Run(Channel_, error); } + + if (MaybeTransformError_) { + error = MaybeTransformError_(std::move(error)); + } + UnderlyingHandler_->HandleError(std::move(error)); } @@ -392,6 +402,7 @@ private: const IClientResponseHandlerPtr UnderlyingHandler_; const TCallback<void(const IChannelPtr&, const TError&)> OnFailure_; const TCallback<bool(const TError&)> IsError_; + const TCallback<TError(TError)> MaybeTransformError_; }; }; @@ -399,13 +410,15 @@ IChannelPtr CreateFailureDetectingChannel( IChannelPtr underlyingChannel, std::optional<TDuration> acknowledgementTimeout, TCallback<void(const IChannelPtr&, const TError& error)> onFailure, - TCallback<bool(const TError&)> isError) + TCallback<bool(const TError&)> isError, + TCallback<TError(TError)> maybeTransformError) { return New<TFailureDetectingChannel>( std::move(underlyingChannel), acknowledgementTimeout, std::move(onFailure), - std::move(isError)); + std::move(isError), + std::move(maybeTransformError)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/helpers.h b/yt/yt/core/rpc/helpers.h index 438093c1ed..792c43a0ea 100644 --- a/yt/yt/core/rpc/helpers.h +++ b/yt/yt/core/rpc/helpers.h @@ -62,7 +62,8 @@ IChannelPtr CreateFailureDetectingChannel( IChannelPtr underlyingChannel, std::optional<TDuration> acknowledgementTimeout, TCallback<void(const IChannelPtr&, const TError& error)> onFailure, - TCallback<bool(const TError&)> isError = BIND(IsChannelFailureError)); + TCallback<bool(const TError&)> isError = BIND(IsChannelFailureError), + TCallback<TError(TError)> maybeTransformError = {}); NTracing::TTraceContextPtr GetOrCreateHandlerTraceContext( const NProto::TRequestHeader& header, diff --git a/yt/yt/core/rpc/peer_discovery.cpp b/yt/yt/core/rpc/peer_discovery.cpp index 696051c419..24595961e6 100644 --- a/yt/yt/core/rpc/peer_discovery.cpp +++ b/yt/yt/core/rpc/peer_discovery.cpp @@ -14,7 +14,7 @@ class TDefaultPeerDiscovery : public IPeerDiscovery { public: - explicit TDefaultPeerDiscovery(TDiscoverRequestHook hook) + explicit TDefaultPeerDiscovery(IDiscoverRequestHookPtr hook) : Hook_(std::move(hook)) { } @@ -30,18 +30,22 @@ public: TGenericProxy proxy(std::move(channel), serviceDescriptor); auto req = proxy.Discover(); if (Hook_) { - Hook_(req.Get()); + Hook_->EnrichRequest(req.Get()); } req->SetTimeout(timeout); req->set_reply_delay(replyDelay.GetValue()); - return req->Invoke().Apply(BIND(&TDefaultPeerDiscovery::ConvertResponse)); + return req->Invoke().Apply(BIND(&TDefaultPeerDiscovery::ConvertResponse, MakeStrong(this))); } private: - const TDiscoverRequestHook Hook_; + const IDiscoverRequestHookPtr Hook_; - static TPeerDiscoveryResponse ConvertResponse(const TIntrusivePtr<TTypedClientResponse<NProto::TRspDiscover>>& rsp) + TPeerDiscoveryResponse ConvertResponse(const TIntrusivePtr<TTypedClientResponse<NProto::TRspDiscover>>& rsp) { + if (Hook_) { + Hook_->OnResponse(rsp.Get()); + } + return TPeerDiscoveryResponse{ .IsUp = rsp->up(), .Addresses = FromProto<std::vector<TString>>(rsp->suggested_addresses()), @@ -51,7 +55,7 @@ private: //////////////////////////////////////////////////////////////////////////////// -IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook) +IPeerDiscoveryPtr CreateDefaultPeerDiscovery(IDiscoverRequestHookPtr hook) { return New<TDefaultPeerDiscovery>(std::move(hook)); } diff --git a/yt/yt/core/rpc/peer_discovery.h b/yt/yt/core/rpc/peer_discovery.h index 6157196ad6..091a31bd35 100644 --- a/yt/yt/core/rpc/peer_discovery.h +++ b/yt/yt/core/rpc/peer_discovery.h @@ -6,6 +6,17 @@ namespace NYT::NRpc { //////////////////////////////////////////////////////////////////////////////// +struct IDiscoverRequestHook + : public TRefCounted +{ + virtual void EnrichRequest(NProto::TReqDiscover* request) const = 0; + virtual void OnResponse(NProto::TRspDiscover* response) const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IDiscoverRequestHook); + +//////////////////////////////////////////////////////////////////////////////// + struct TPeerDiscoveryResponse { bool IsUp; @@ -27,7 +38,7 @@ DEFINE_REFCOUNTED_TYPE(IPeerDiscovery) //////////////////////////////////////////////////////////////////////////////// -IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook = {}); +IPeerDiscoveryPtr CreateDefaultPeerDiscovery(IDiscoverRequestHookPtr hook = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index f544383487..759f741bd8 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -78,6 +78,7 @@ DECLARE_REFCOUNTED_CLASS(TAttachmentsInputStream) DECLARE_REFCOUNTED_CLASS(TAttachmentsOutputStream) DECLARE_REFCOUNTED_STRUCT(IViablePeerRegistry) +DECLARE_REFCOUNTED_STRUCT(IDiscoverRequestHook) DECLARE_REFCOUNTED_STRUCT(IPeerDiscovery) DECLARE_REFCOUNTED_CLASS(TDynamicChannelPool) @@ -147,8 +148,6 @@ constexpr int TypicalMessagePartCount = 8; using TFeatureIdFormatter = const std::function<std::optional<TStringBuf>(int featureId)>*; -using TDiscoverRequestHook = TCallback<void(NProto::TReqDiscover*)>; - //////////////////////////////////////////////////////////////////////////////// extern const TString RequestIdAnnotation; @@ -185,6 +184,7 @@ YT_DEFINE_ERROR_ENUM( // The client should try to reduce their request rate until the server has had a chance to recover. ((SslError) (static_cast<int>(NBus::EErrorCode::SslError))) ((MemoryOverflow) (120)) + ((GlobalDiscoveryError) (121)) // Single peer discovery interrupts discovery session. ); DEFINE_ENUM(EMessageFormat, diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 87093d5295..de8600bcd2 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -2697,6 +2697,9 @@ bool TServiceBase::IsUp(const TCtxDiscoverPtr& /*context*/) return true; } +void TServiceBase::EnrichDiscoverResponse(TRspDiscover* /*response*/) +{ } + std::vector<TString> TServiceBase::SuggestAddresses() { VERIFY_THREAD_AFFINITY_ANY(); @@ -2714,6 +2717,7 @@ DEFINE_RPC_SERVICE_METHOD(TServiceBase, Discover) replyDelay); auto isUp = IsUp(context); + EnrichDiscoverResponse(response); // Fast path. if (replyDelay == TDuration::Zero() || isUp) { diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index a388116d67..e21e9d59c2 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -850,6 +850,14 @@ protected: virtual bool IsUp(const TCtxDiscoverPtr& context); //! Used by peer discovery. + //! Fills response message extensions with additional info. + /*! + * \note + * Thread affinity: any + */ + virtual void EnrichDiscoverResponse(TRspDiscover* response); + + //! Used by peer discovery. //! Returns addresses of neighboring peers to be suggested to the client. /*! * \note diff --git a/yt/yt_proto/yt/core/rpc/proto/rpc.proto b/yt/yt_proto/yt/core/rpc/proto/rpc.proto index c9d0ea291f..4fc9fceb5a 100644 --- a/yt/yt_proto/yt/core/rpc/proto/rpc.proto +++ b/yt/yt_proto/yt/core/rpc/proto/rpc.proto @@ -208,6 +208,8 @@ message TRspDiscover { required bool up = 1; repeated string suggested_addresses = 2; + + extensions 100 to max; } //////////////////////////////////////////////////////////////////////////////// |