aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordanilalexeev <danilalexeev@yandex-team.com>2024-03-14 11:26:38 +0300
committerdanilalexeev <danilalexeev@yandex-team.com>2024-03-14 11:45:36 +0300
commit43f3e79465b37117220ad19202e4de5971d21cdb (patch)
tree16c669b8067306041bb12e4b2282692e6c408757
parent678e296ea969fe485638b36b6a20122d6df2aa96 (diff)
downloadydb-43f3e79465b37117220ad19202e4de5971d21cdb.tar.gz
Fix peer discovery in case of read-only leader downtime
e22067ea1e62a3d9c773a428251d189ab1d86168
-rw-r--r--yt/yt/core/rpc/dynamic_channel_pool.cpp58
-rw-r--r--yt/yt/core/rpc/helpers.cpp23
-rw-r--r--yt/yt/core/rpc/helpers.h3
-rw-r--r--yt/yt/core/rpc/peer_discovery.cpp16
-rw-r--r--yt/yt/core/rpc/peer_discovery.h13
-rw-r--r--yt/yt/core/rpc/public.h4
-rw-r--r--yt/yt/core/rpc/service_detail.cpp4
-rw-r--r--yt/yt/core/rpc/service_detail.h8
-rw-r--r--yt/yt_proto/yt/core/rpc/proto/rpc.proto2
9 files changed, 106 insertions, 25 deletions
diff --git a/yt/yt/core/rpc/dynamic_channel_pool.cpp b/yt/yt/core/rpc/dynamic_channel_pool.cpp
index 5ce042662d..cc404497a3 100644
--- a/yt/yt/core/rpc/dynamic_channel_pool.cpp
+++ b/yt/yt/core/rpc/dynamic_channel_pool.cpp
@@ -203,6 +203,7 @@ private:
TDelayedExecutorCookie RediscoveryCookie_;
TError TerminationError_;
TError PeerDiscoveryError_;
+ TError LastGlobalDiscoveryError_;
THashSet<TString> ActiveAddresses_;
THashSet<TString> BannedAddresses_;
@@ -262,7 +263,7 @@ private:
THashSet<TString> RequestingAddresses_;
constexpr static int MaxDiscoveryErrorsToKeep = 100;
- std::deque<TError> DiscoveryErrors_;
+ std::deque<TError> PeerDiscoveryErrors_;
void DoRun()
{
@@ -357,6 +358,10 @@ private:
BanPeer(address, error, owner->Config_->SoftBackoffTime);
InvalidatePeer(address);
}
+ } else if (rspOrError.GetCode() == NRpc::EErrorCode::GlobalDiscoveryError) {
+ YT_LOG_DEBUG(rspOrError, "Peer discovery session failed (Address: %v)",
+ address);
+ OnFinished(rspOrError);
} else {
YT_LOG_DEBUG(rspOrError, "Peer discovery request failed (Address: %v)",
address);
@@ -403,19 +408,19 @@ private:
auto guard = Guard(SpinLock_);
YT_VERIFY(RequestedAddresses_.erase(address) == 1);
- DiscoveryErrors_.push_back(error);
- while (std::ssize(DiscoveryErrors_) > MaxDiscoveryErrorsToKeep) {
- DiscoveryErrors_.pop_front();
+ PeerDiscoveryErrors_.push_back(error);
+ while (std::ssize(PeerDiscoveryErrors_) > MaxDiscoveryErrorsToKeep) {
+ PeerDiscoveryErrors_.pop_front();
}
}
owner->BanPeer(address, backoffTime);
}
- std::vector<TError> GetDiscoveryErrors()
+ std::vector<TError> GetPeerDiscoveryErrors()
{
auto guard = Guard(SpinLock_);
- return {DiscoveryErrors_.begin(), DiscoveryErrors_.end()};
+ return {PeerDiscoveryErrors_.begin(), PeerDiscoveryErrors_.end()};
}
void AddViablePeer(const TString& address)
@@ -438,7 +443,7 @@ private:
owner->InvalidatePeer(address);
}
- void OnFinished()
+ void OnFinished(const TError& globalDiscoveryError = {})
{
auto owner = Owner_.Lock();
if (!owner) {
@@ -453,7 +458,10 @@ private:
FinishedPromise_.Set();
} else {
auto error = owner->MakeNoAlivePeersError()
- << GetDiscoveryErrors();
+ << GetPeerDiscoveryErrors();
+ if (!globalDiscoveryError.IsOK()) {
+ error <<= globalDiscoveryError;
+ }
YT_LOG_DEBUG(error, "Error performing peer discovery");
owner->ViablePeerRegistry_->SetError(error);
FinishedPromise_.Set(error);
@@ -567,6 +575,9 @@ private:
YT_LOG_DEBUG("Peer is down");
}
} else {
+ if (rspOrError.GetCode() == NRpc::EErrorCode::GlobalDiscoveryError) {
+ owner->SetLastGlobalDiscoveryError(rspOrError);
+ }
YT_LOG_DEBUG(rspOrError, "Failed to poll peer");
}
@@ -618,6 +629,12 @@ private:
return session;
}
+ void SetLastGlobalDiscoveryError(const TError& error)
+ {
+ auto guard = WriterGuard(SpinLock_);
+ LastGlobalDiscoveryError_ = error;
+ }
+
TError MakeNoAlivePeersError()
{
auto guard = ReaderGuard(SpinLock_);
@@ -657,11 +674,13 @@ private:
session->Run();
}
- void OnDiscoverySessionFinished(const TError& /*error*/)
+ void OnDiscoverySessionFinished(const TError& globalDiscoveryError)
{
NTracing::TNullTraceContextGuard nullTraceContext;
auto guard = WriterGuard(SpinLock_);
+ LastGlobalDiscoveryError_ = globalDiscoveryError;
+
YT_VERIFY(CurrentDiscoverySession_);
CurrentDiscoverySession_.Reset();
@@ -829,6 +848,17 @@ private:
ViablePeerRegistry_->UnregisterPeer(address);
}
+ TError MaybeTransformChannelError(TError error)
+ {
+ auto guard = ReaderGuard(SpinLock_);
+
+ if (!LastGlobalDiscoveryError_.IsOK()) {
+ return LastGlobalDiscoveryError_
+ << std::move(error);
+ }
+ return error;
+ }
+
void OnChannelFailed(
const TString& address,
const IChannelPtr& channel,
@@ -854,7 +884,15 @@ private:
return CreateFailureDetectingChannel(
ChannelFactory_->CreateChannel(address),
Config_->AcknowledgementTimeout,
- BIND(&TImpl::OnChannelFailed, MakeWeak(this), address));
+ BIND(&TImpl::OnChannelFailed, MakeWeak(this), address),
+ BIND(&IsChannelFailureError),
+ BIND([this_ = MakeWeak(this)] (TError error) {
+ if (auto strongThis = this_.Lock()) {
+ return strongThis->MaybeTransformChannelError(std::move(error));
+ } else {
+ return error;
+ }
+ }));
}
};
diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp
index 27cc2c7af4..49d61147e6 100644
--- a/yt/yt/core/rpc/helpers.cpp
+++ b/yt/yt/core/rpc/helpers.cpp
@@ -302,11 +302,13 @@ public:
IChannelPtr underlyingChannel,
std::optional<TDuration> acknowledgementTimeout,
TCallback<void(const IChannelPtr&, const TError&)> onFailure,
- TCallback<bool(const TError&)> isError)
+ TCallback<bool(const TError&)> isError,
+ TCallback<TError(TError)> maybeTransformError)
: TChannelWrapper(std::move(underlyingChannel))
, AcknowledgementTimeout_(acknowledgementTimeout)
, OnFailure_(std::move(onFailure))
, IsError_(std::move(isError))
+ , MaybeTransformError_(std::move(maybeTransformError))
, OnTerminated_(BIND(&TFailureDetectingChannel::OnTerminated, MakeWeak(this)))
{
UnderlyingChannel_->SubscribeTerminated(OnTerminated_);
@@ -328,7 +330,7 @@ public:
}
return UnderlyingChannel_->Send(
request,
- New<TResponseHandler>(this, std::move(responseHandler), OnFailure_, IsError_),
+ New<TResponseHandler>(this, std::move(responseHandler), OnFailure_, IsError_, MaybeTransformError_),
updatedOptions);
}
@@ -336,6 +338,7 @@ private:
const std::optional<TDuration> AcknowledgementTimeout_;
const TCallback<void(const IChannelPtr&, const TError&)> OnFailure_;
const TCallback<bool(const TError&)> IsError_;
+ const TCallback<TError(TError)> MaybeTransformError_;
const TCallback<void(const TError&)> OnTerminated_;
@@ -352,11 +355,13 @@ private:
IChannelPtr channel,
IClientResponseHandlerPtr underlyingHandler,
TCallback<void(const IChannelPtr&, const TError&)> onFailure,
- TCallback<bool(const TError&)> isError)
+ TCallback<bool(const TError&)> isError,
+ TCallback<TError(TError)> maybeTransformError)
: Channel_(std::move(channel))
, UnderlyingHandler_(std::move(underlyingHandler))
, OnFailure_(std::move(onFailure))
, IsError_(std::move(isError))
+ , MaybeTransformError_(std::move(maybeTransformError))
{ }
void HandleAcknowledgement() override
@@ -374,6 +379,11 @@ private:
if (IsError_(error)) {
OnFailure_.Run(Channel_, error);
}
+
+ if (MaybeTransformError_) {
+ error = MaybeTransformError_(std::move(error));
+ }
+
UnderlyingHandler_->HandleError(std::move(error));
}
@@ -392,6 +402,7 @@ private:
const IClientResponseHandlerPtr UnderlyingHandler_;
const TCallback<void(const IChannelPtr&, const TError&)> OnFailure_;
const TCallback<bool(const TError&)> IsError_;
+ const TCallback<TError(TError)> MaybeTransformError_;
};
};
@@ -399,13 +410,15 @@ IChannelPtr CreateFailureDetectingChannel(
IChannelPtr underlyingChannel,
std::optional<TDuration> acknowledgementTimeout,
TCallback<void(const IChannelPtr&, const TError& error)> onFailure,
- TCallback<bool(const TError&)> isError)
+ TCallback<bool(const TError&)> isError,
+ TCallback<TError(TError)> maybeTransformError)
{
return New<TFailureDetectingChannel>(
std::move(underlyingChannel),
acknowledgementTimeout,
std::move(onFailure),
- std::move(isError));
+ std::move(isError),
+ std::move(maybeTransformError));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/helpers.h b/yt/yt/core/rpc/helpers.h
index 438093c1ed..792c43a0ea 100644
--- a/yt/yt/core/rpc/helpers.h
+++ b/yt/yt/core/rpc/helpers.h
@@ -62,7 +62,8 @@ IChannelPtr CreateFailureDetectingChannel(
IChannelPtr underlyingChannel,
std::optional<TDuration> acknowledgementTimeout,
TCallback<void(const IChannelPtr&, const TError& error)> onFailure,
- TCallback<bool(const TError&)> isError = BIND(IsChannelFailureError));
+ TCallback<bool(const TError&)> isError = BIND(IsChannelFailureError),
+ TCallback<TError(TError)> maybeTransformError = {});
NTracing::TTraceContextPtr GetOrCreateHandlerTraceContext(
const NProto::TRequestHeader& header,
diff --git a/yt/yt/core/rpc/peer_discovery.cpp b/yt/yt/core/rpc/peer_discovery.cpp
index 696051c419..24595961e6 100644
--- a/yt/yt/core/rpc/peer_discovery.cpp
+++ b/yt/yt/core/rpc/peer_discovery.cpp
@@ -14,7 +14,7 @@ class TDefaultPeerDiscovery
: public IPeerDiscovery
{
public:
- explicit TDefaultPeerDiscovery(TDiscoverRequestHook hook)
+ explicit TDefaultPeerDiscovery(IDiscoverRequestHookPtr hook)
: Hook_(std::move(hook))
{ }
@@ -30,18 +30,22 @@ public:
TGenericProxy proxy(std::move(channel), serviceDescriptor);
auto req = proxy.Discover();
if (Hook_) {
- Hook_(req.Get());
+ Hook_->EnrichRequest(req.Get());
}
req->SetTimeout(timeout);
req->set_reply_delay(replyDelay.GetValue());
- return req->Invoke().Apply(BIND(&TDefaultPeerDiscovery::ConvertResponse));
+ return req->Invoke().Apply(BIND(&TDefaultPeerDiscovery::ConvertResponse, MakeStrong(this)));
}
private:
- const TDiscoverRequestHook Hook_;
+ const IDiscoverRequestHookPtr Hook_;
- static TPeerDiscoveryResponse ConvertResponse(const TIntrusivePtr<TTypedClientResponse<NProto::TRspDiscover>>& rsp)
+ TPeerDiscoveryResponse ConvertResponse(const TIntrusivePtr<TTypedClientResponse<NProto::TRspDiscover>>& rsp)
{
+ if (Hook_) {
+ Hook_->OnResponse(rsp.Get());
+ }
+
return TPeerDiscoveryResponse{
.IsUp = rsp->up(),
.Addresses = FromProto<std::vector<TString>>(rsp->suggested_addresses()),
@@ -51,7 +55,7 @@ private:
////////////////////////////////////////////////////////////////////////////////
-IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook)
+IPeerDiscoveryPtr CreateDefaultPeerDiscovery(IDiscoverRequestHookPtr hook)
{
return New<TDefaultPeerDiscovery>(std::move(hook));
}
diff --git a/yt/yt/core/rpc/peer_discovery.h b/yt/yt/core/rpc/peer_discovery.h
index 6157196ad6..091a31bd35 100644
--- a/yt/yt/core/rpc/peer_discovery.h
+++ b/yt/yt/core/rpc/peer_discovery.h
@@ -6,6 +6,17 @@ namespace NYT::NRpc {
////////////////////////////////////////////////////////////////////////////////
+struct IDiscoverRequestHook
+ : public TRefCounted
+{
+ virtual void EnrichRequest(NProto::TReqDiscover* request) const = 0;
+ virtual void OnResponse(NProto::TRspDiscover* response) const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IDiscoverRequestHook);
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TPeerDiscoveryResponse
{
bool IsUp;
@@ -27,7 +38,7 @@ DEFINE_REFCOUNTED_TYPE(IPeerDiscovery)
////////////////////////////////////////////////////////////////////////////////
-IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook = {});
+IPeerDiscoveryPtr CreateDefaultPeerDiscovery(IDiscoverRequestHookPtr hook = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h
index f544383487..759f741bd8 100644
--- a/yt/yt/core/rpc/public.h
+++ b/yt/yt/core/rpc/public.h
@@ -78,6 +78,7 @@ DECLARE_REFCOUNTED_CLASS(TAttachmentsInputStream)
DECLARE_REFCOUNTED_CLASS(TAttachmentsOutputStream)
DECLARE_REFCOUNTED_STRUCT(IViablePeerRegistry)
+DECLARE_REFCOUNTED_STRUCT(IDiscoverRequestHook)
DECLARE_REFCOUNTED_STRUCT(IPeerDiscovery)
DECLARE_REFCOUNTED_CLASS(TDynamicChannelPool)
@@ -147,8 +148,6 @@ constexpr int TypicalMessagePartCount = 8;
using TFeatureIdFormatter = const std::function<std::optional<TStringBuf>(int featureId)>*;
-using TDiscoverRequestHook = TCallback<void(NProto::TReqDiscover*)>;
-
////////////////////////////////////////////////////////////////////////////////
extern const TString RequestIdAnnotation;
@@ -185,6 +184,7 @@ YT_DEFINE_ERROR_ENUM(
// The client should try to reduce their request rate until the server has had a chance to recover.
((SslError) (static_cast<int>(NBus::EErrorCode::SslError)))
((MemoryOverflow) (120))
+ ((GlobalDiscoveryError) (121)) // Single peer discovery interrupts discovery session.
);
DEFINE_ENUM(EMessageFormat,
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 87093d5295..de8600bcd2 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -2697,6 +2697,9 @@ bool TServiceBase::IsUp(const TCtxDiscoverPtr& /*context*/)
return true;
}
+void TServiceBase::EnrichDiscoverResponse(TRspDiscover* /*response*/)
+{ }
+
std::vector<TString> TServiceBase::SuggestAddresses()
{
VERIFY_THREAD_AFFINITY_ANY();
@@ -2714,6 +2717,7 @@ DEFINE_RPC_SERVICE_METHOD(TServiceBase, Discover)
replyDelay);
auto isUp = IsUp(context);
+ EnrichDiscoverResponse(response);
// Fast path.
if (replyDelay == TDuration::Zero() || isUp) {
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index a388116d67..e21e9d59c2 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -850,6 +850,14 @@ protected:
virtual bool IsUp(const TCtxDiscoverPtr& context);
//! Used by peer discovery.
+ //! Fills response message extensions with additional info.
+ /*!
+ * \note
+ * Thread affinity: any
+ */
+ virtual void EnrichDiscoverResponse(TRspDiscover* response);
+
+ //! Used by peer discovery.
//! Returns addresses of neighboring peers to be suggested to the client.
/*!
* \note
diff --git a/yt/yt_proto/yt/core/rpc/proto/rpc.proto b/yt/yt_proto/yt/core/rpc/proto/rpc.proto
index c9d0ea291f..4fc9fceb5a 100644
--- a/yt/yt_proto/yt/core/rpc/proto/rpc.proto
+++ b/yt/yt_proto/yt/core/rpc/proto/rpc.proto
@@ -208,6 +208,8 @@ message TRspDiscover
{
required bool up = 1;
repeated string suggested_addresses = 2;
+
+ extensions 100 to max;
}
////////////////////////////////////////////////////////////////////////////////