aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <yumkam7@ydb.tech>2025-02-24 14:52:14 +0300
committerGitHub <noreply@github.com>2025-02-24 14:52:14 +0300
commitf3e096874baf532262868b224f4f8b2224ec7bcc (patch)
treefaa074c52d06976b50babd30bb5ee1a59ef9f17e
parent9d32e6de2d92d8fc36663bd5d3198f66a3e9d6ff (diff)
downloadydb-f3e096874baf532262868b224f4f8b2224ec7bcc.tar.gz
generic lookup: use retry_policy library (#13460)
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h9
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp74
2 files changed, 45 insertions, 38 deletions
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h
index 04bf62f047..49d769de43 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h
@@ -90,15 +90,6 @@ namespace NYql::NDq {
NConnector::NApi::TError Error;
};
- struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
- explicit TEvRetry(ui32 nextRetries)
- : NextRetries(nextRetries)
- {
- }
-
- ui32 NextRetries;
- };
-
protected: // TODO move common logic here
};
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp
index 4f4562ae88..d61c899651 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp
@@ -22,6 +22,8 @@
#include <yql/essentials/utils/yql_panic.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <library/cpp/retry/retry_policy.h>
+
namespace NYql::NDq {
using namespace NActors;
@@ -60,6 +62,12 @@ namespace NYql::NDq {
public TGenericBaseActor<TGenericLookupActor> {
using TBase = TGenericBaseActor<TGenericLookupActor>;
+ using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>;
+ using ILookupRetryState = ILookupRetryPolicy::IRetryState;
+
+ struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> {
+ };
+
public:
TGenericLookupActor(
NConnector::IClient::TPtr connectorClient,
@@ -86,6 +94,24 @@ namespace NYql::NDq {
, HolderFactory(holderFactory)
, ColumnDestinations(CreateColumnDestination())
, MaxKeysInRequest(maxKeysInRequest)
+ , RetryPolicy(
+ ILookupRetryPolicy::GetExponentialBackoffPolicy(
+ /* retryClassFunction */
+ [](const NYdbGrpc::TGrpcStatus& status) {
+ if (NConnector::GrpcStatusNeedsRetry(status)) {
+ return ERetryErrorClass::ShortRetry;
+ }
+ if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
+ return ERetryErrorClass::ShortRetry; // TODO LongRetry?
+ }
+ return ERetryErrorClass::NoRetry;
+ },
+ /* minDelay */ TDuration::MilliSeconds(1),
+ /* minLongRetryDelay */ TDuration::MilliSeconds(500),
+ /* maxDelay */ TDuration::Seconds(1),
+ /* maxRetries */ RequestRetriesLimit,
+ /* maxTime */ TDuration::Minutes(5),
+ /* scaleFactor */ 2))
{
InitMonCounters(taskCounters);
}
@@ -156,7 +182,7 @@ namespace NYql::NDq {
hFunc(TEvReadSplitsPart, Handle);
hFunc(TEvReadSplitsFinished, Handle);
hFunc(TEvError, Handle);
- hFunc(TEvRetry, Handle);
+ hFunc(TEvLookupRetry, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);)
void Handle(TEvListSplitsIterator::TPtr ev) {
@@ -165,7 +191,7 @@ namespace NYql::NDq {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
- retriesRemaining = RetriesRemaining
+ retryState = RetryState
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
auto result = ExtractFromConstFuture(asyncResult);
@@ -174,7 +200,7 @@ namespace NYql::NDq {
auto ev = new TEvListSplitsPart(std::move(*result.Response));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
}
@@ -198,7 +224,7 @@ namespace NYql::NDq {
Connector->ReadSplits(readRequest, RequestTimeout).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
- retriesRemaining = RetriesRemaining
+ retryState = RetryState
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
auto result = ExtractFromConstFuture(asyncResult);
@@ -206,7 +232,7 @@ namespace NYql::NDq {
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
}
@@ -235,9 +261,8 @@ namespace NYql::NDq {
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
}
- void Handle(TEvRetry::TPtr ev) {
+ void Handle(TEvLookupRetry::TPtr) {
auto guard = Guard(*Alloc);
- RetriesRemaining = ev->Get()->NextRetries;
SendRequest();
}
@@ -269,7 +294,7 @@ namespace NYql::NDq {
}
Request = std::move(request);
- RetriesRemaining = RequestRetriesLimit;
+ RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy->CreateRetryState());
SendRequest();
}
@@ -287,7 +312,7 @@ namespace NYql::NDq {
Connector->ListSplits(splitRequest, RequestTimeout).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
- retriesRemaining = RetriesRemaining
+ retryState = RetryState
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
@@ -296,7 +321,7 @@ namespace NYql::NDq {
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
if (CpuTime) {
@@ -309,7 +334,7 @@ namespace NYql::NDq {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
- retriesRemaining = RetriesRemaining
+ retryState = RetryState
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
@@ -328,7 +353,7 @@ namespace NYql::NDq {
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
- SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
+ SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
}
@@ -394,22 +419,12 @@ namespace NYql::NDq {
new TEvError(std::move(error)));
}
- static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
- if (NConnector::GrpcStatusNeedsRetry(status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
- if (retriesRemaining) {
- const auto retry = RequestRetriesLimit - retriesRemaining;
- const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
- // << TODO tune/tweak
- YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
- --retriesRemaining;
- if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
- // if error was deadline, retry only once
- retriesRemaining = 0; // TODO tune/tweak
- }
- actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining)));
- return;
- }
- YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
+ static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) {
+ auto nextRetry = retryState->GetNextRetryDelay(status);
+ if (nextRetry) {
+ YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry scheduled in " << *nextRetry;
+ actorSystem->Schedule(*nextRetry, new IEventHandle(selfId, selfId, new TEvLookupRetry()));
+ return;
}
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
}
@@ -509,7 +524,8 @@ namespace NYql::NDq {
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
- ui32 RetriesRemaining;
+ ILookupRetryPolicy::TPtr RetryPolicy;
+ std::shared_ptr<ILookupRetryState> RetryState;
::NMonitoring::TDynamicCounters::TCounterPtr Count;
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;