diff options
author | yumkam <yumkam7@ydb.tech> | 2025-02-24 14:52:14 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-24 14:52:14 +0300 |
commit | f3e096874baf532262868b224f4f8b2224ec7bcc (patch) | |
tree | faa074c52d06976b50babd30bb5ee1a59ef9f17e | |
parent | 9d32e6de2d92d8fc36663bd5d3198f66a3e9d6ff (diff) | |
download | ydb-f3e096874baf532262868b224f4f8b2224ec7bcc.tar.gz |
generic lookup: use retry_policy library (#13460)
-rw-r--r-- | ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h | 9 | ||||
-rw-r--r-- | ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp | 74 |
2 files changed, 45 insertions, 38 deletions
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h index 04bf62f047..49d769de43 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h @@ -90,15 +90,6 @@ namespace NYql::NDq { NConnector::NApi::TError Error; }; - struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> { - explicit TEvRetry(ui32 nextRetries) - : NextRetries(nextRetries) - { - } - - ui32 NextRetries; - }; - protected: // TODO move common logic here }; diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index 4f4562ae88..d61c899651 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -22,6 +22,8 @@ #include <yql/essentials/utils/yql_panic.h> #include <ydb/core/formats/arrow/serializer/abstract.h> +#include <library/cpp/retry/retry_policy.h> + namespace NYql::NDq { using namespace NActors; @@ -60,6 +62,12 @@ namespace NYql::NDq { public TGenericBaseActor<TGenericLookupActor> { using TBase = TGenericBaseActor<TGenericLookupActor>; + using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>; + using ILookupRetryState = ILookupRetryPolicy::IRetryState; + + struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> { + }; + public: TGenericLookupActor( NConnector::IClient::TPtr connectorClient, @@ -86,6 +94,24 @@ namespace NYql::NDq { , HolderFactory(holderFactory) , ColumnDestinations(CreateColumnDestination()) , MaxKeysInRequest(maxKeysInRequest) + , RetryPolicy( + ILookupRetryPolicy::GetExponentialBackoffPolicy( + /* retryClassFunction */ + [](const NYdbGrpc::TGrpcStatus& status) { + if (NConnector::GrpcStatusNeedsRetry(status)) { + return ERetryErrorClass::ShortRetry; + } + if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) { + return ERetryErrorClass::ShortRetry; // TODO LongRetry? + } + return ERetryErrorClass::NoRetry; + }, + /* minDelay */ TDuration::MilliSeconds(1), + /* minLongRetryDelay */ TDuration::MilliSeconds(500), + /* maxDelay */ TDuration::Seconds(1), + /* maxRetries */ RequestRetriesLimit, + /* maxTime */ TDuration::Minutes(5), + /* scaleFactor */ 2)) { InitMonCounters(taskCounters); } @@ -156,7 +182,7 @@ namespace NYql::NDq { hFunc(TEvReadSplitsPart, Handle); hFunc(TEvReadSplitsFinished, Handle); hFunc(TEvError, Handle); - hFunc(TEvRetry, Handle); + hFunc(TEvLookupRetry, Handle); hFunc(NActors::TEvents::TEvPoison, Handle);) void Handle(TEvListSplitsIterator::TPtr ev) { @@ -165,7 +191,7 @@ namespace NYql::NDq { [ actorSystem = TActivationContext::ActorSystem(), selfId = SelfId(), - retriesRemaining = RetriesRemaining + retryState = RetryState ](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) { YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector"; auto result = ExtractFromConstFuture(asyncResult); @@ -174,7 +200,7 @@ namespace NYql::NDq { auto ev = new TEvListSplitsPart(std::move(*result.Response)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); + SendRetryOrError(actorSystem, selfId, result.Status, retryState); } }); } @@ -198,7 +224,7 @@ namespace NYql::NDq { Connector->ReadSplits(readRequest, RequestTimeout).Subscribe([ actorSystem = TActivationContext::ActorSystem(), selfId = SelfId(), - retriesRemaining = RetriesRemaining + retryState = RetryState ](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) { YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector"; auto result = ExtractFromConstFuture(asyncResult); @@ -206,7 +232,7 @@ namespace NYql::NDq { auto ev = new TEvReadSplitsIterator(std::move(result.Iterator)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); + SendRetryOrError(actorSystem, selfId, result.Status, retryState); } }); } @@ -235,9 +261,8 @@ namespace NYql::NDq { actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release())); } - void Handle(TEvRetry::TPtr ev) { + void Handle(TEvLookupRetry::TPtr) { auto guard = Guard(*Alloc); - RetriesRemaining = ev->Get()->NextRetries; SendRequest(); } @@ -269,7 +294,7 @@ namespace NYql::NDq { } Request = std::move(request); - RetriesRemaining = RequestRetriesLimit; + RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy->CreateRetryState()); SendRequest(); } @@ -287,7 +312,7 @@ namespace NYql::NDq { Connector->ListSplits(splitRequest, RequestTimeout).Subscribe([ actorSystem = TActivationContext::ActorSystem(), selfId = SelfId(), - retriesRemaining = RetriesRemaining + retryState = RetryState ](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) { auto result = ExtractFromConstFuture(asyncResult); if (result.Status.Ok()) { @@ -296,7 +321,7 @@ namespace NYql::NDq { auto ev = new TEvListSplitsIterator(std::move(result.Iterator)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); + SendRetryOrError(actorSystem, selfId, result.Status, retryState); } }); if (CpuTime) { @@ -309,7 +334,7 @@ namespace NYql::NDq { [ actorSystem = TActivationContext::ActorSystem(), selfId = SelfId(), - retriesRemaining = RetriesRemaining + retryState = RetryState ](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) { auto result = ExtractFromConstFuture(asyncResult); if (result.Status.Ok()) { @@ -328,7 +353,7 @@ namespace NYql::NDq { auto ev = new TEvReadSplitsFinished(std::move(result.Status)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); + SendRetryOrError(actorSystem, selfId, result.Status, retryState); } }); } @@ -394,22 +419,12 @@ namespace NYql::NDq { new TEvError(std::move(error))); } - static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) { - if (NConnector::GrpcStatusNeedsRetry(status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) { - if (retriesRemaining) { - const auto retry = RequestRetriesLimit - retriesRemaining; - const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s - // << TODO tune/tweak - YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay; - --retriesRemaining; - if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) { - // if error was deadline, retry only once - retriesRemaining = 0; // TODO tune/tweak - } - actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining))); - return; - } - YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit; + static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) { + auto nextRetry = retryState->GetNextRetryDelay(status); + if (nextRetry) { + YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry scheduled in " << *nextRetry; + actorSystem->Schedule(*nextRetry, new IEventHandle(selfId, selfId, new TEvLookupRetry())); + return; } SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status)); } @@ -509,7 +524,8 @@ namespace NYql::NDq { std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request; NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult; - ui32 RetriesRemaining; + ILookupRetryPolicy::TPtr RetryPolicy; + std::shared_ptr<ILookupRetryState> RetryState; ::NMonitoring::TDynamicCounters::TCounterPtr Count; ::NMonitoring::TDynamicCounters::TCounterPtr Keys; ::NMonitoring::TDynamicCounters::TCounterPtr ResultRows; |