aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-23 03:59:10 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-23 03:59:10 +0300
commitbd77aa5b75631ae18c9dc30db2cb6e43bd904145 (patch)
tree548bc072ced209c6e4bed5684b32c023e2f4e64a
parent1701ff2aa8f104bebfe3510f86c2293ee145105c (diff)
downloadydb-bd77aa5b75631ae18c9dc30db2cb6e43bd904145.tar.gz
YQ-859 Stabilize python tests
Fix counters cleanup race Retry query creation with idempotency_key Add sleep in tests for waiting Add wait to retry configs Fix test_state_load_mode ref:86b77e9e5d95b0739536879897cad1e875b6af03
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp126
1 files changed, 71 insertions, 55 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index 5a7f71d9439..1568de4b74b 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -73,19 +73,45 @@ using namespace NYql;
namespace {
-struct TEvGetTaskInternalResponse : public NActors::TEventLocal<TEvGetTaskInternalResponse, NActors::TEvents::TSystem::Completed> {
- bool Success = false;
- const TIssues Issues;
- const Yq::Private::GetTaskResult Result;
-
- TEvGetTaskInternalResponse(
- bool success,
- const TIssues& issues,
- const Yq::Private::GetTaskResult& result)
- : Success(success)
- , Issues(issues)
- , Result(result)
- { }
+struct TEvPrivate {
+ // Event ids
+ enum EEv : ui32 {
+ EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+
+ EvGetTaskInternalResponse = EvBegin,
+ EvCleanupCounters,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+
+ // Events
+ struct TEvGetTaskInternalResponse : public NActors::TEventLocal<TEvGetTaskInternalResponse, EvGetTaskInternalResponse> {
+ bool Success = false;
+ const TIssues Issues;
+ const Yq::Private::GetTaskResult Result;
+
+ TEvGetTaskInternalResponse(
+ bool success,
+ const TIssues& issues,
+ const Yq::Private::GetTaskResult& result)
+ : Success(success)
+ , Issues(issues)
+ , Result(result)
+ { }
+ };
+
+ struct TEvCleanupCounters : public NActors::TEventLocal<TEvCleanupCounters, EvCleanupCounters> {
+ TEvCleanupCounters(const TString& queryId, const NActors::TActorId& runActorId)
+ : QueryId(queryId)
+ , RunActorId(runActorId)
+ {
+ }
+
+ const TString QueryId;
+ const NActors::TActorId RunActorId;
+ };
};
template <class TElement>
@@ -93,13 +119,10 @@ TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TEl
return { field.begin(), field.end() };
}
-} // namespace
-
-constexpr auto WAKEUP_TAG_FETCH = 1;
-constexpr auto WAKEUP_TAG_CLEANUP = 2;
-
constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60);
+} // namespace
+
class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
public:
TPendingFetcher(
@@ -159,7 +182,7 @@ public:
Y_UNUSED(ctx);
DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory));
- Send(SelfId(), new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH));
+ Send(SelfId(), new NActors::TEvents::TEvWakeup());
LOG_I("STARTED");
LogScope.ConstructInPlace(NActors::TActivationContext::ActorSystem(), NKikimrServices::YQL_PROXY, Guid);
@@ -172,37 +195,32 @@ private:
HasRunningRequest = false;
}
- void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) {
- switch(ev->Get()->Tag) {
- case WAKEUP_TAG_FETCH:
- Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH));
- if (!HasRunningRequest) {
- HasRunningRequest = true;
- GetPendingTask();
- }
- break;
- case WAKEUP_TAG_CLEANUP:
- auto now = Now();
- std::vector<TString> erased;
- for (auto& [queryId, counters] : CountersMap) {
- if (counters.RunActorId == TActorId() && counters.CleanupDeadline <= now) {
- if (counters.RootCountersParent) {
- counters.RootCountersParent->RemoveSubgroup("query_id", queryId);
- }
- if (counters.PublicCountersParent) {
- counters.PublicCountersParent->RemoveSubgroup("query_id", queryId);
- }
- erased.push_back(queryId);
- }
- }
- for (auto q : erased) {
- CountersMap.erase(q);
- }
- break;
+ void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr&) {
+ Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup());
+ if (!HasRunningRequest) {
+ HasRunningRequest = true;
+ GetPendingTask();
+ }
+ }
+
+ void HandleCleanupCounters(TEvPrivate::TEvCleanupCounters::TPtr& ev) {
+ const TString& queryId = ev->Get()->QueryId;
+ const auto countersIt = CountersMap.find(queryId);
+ if (countersIt == CountersMap.end() || countersIt->second.RunActorId != ev->Get()->RunActorId) {
+ return;
+ }
+
+ auto& counters = countersIt->second;
+ if (counters.RootCountersParent) {
+ counters.RootCountersParent->RemoveSubgroup("query_id", queryId);
+ }
+ if (counters.PublicCountersParent) {
+ counters.PublicCountersParent->RemoveSubgroup("query_id", queryId);
}
+ CountersMap.erase(countersIt);
}
- void HandleResponse(TEvGetTaskInternalResponse::TPtr& ev) {
+ void HandleResponse(TEvPrivate::TEvGetTaskInternalResponse::TPtr& ev) {
HasRunningRequest = false;
LOG_D("Got GetTask response from PrivateApi");
if (!ev->Get()->Success) {
@@ -235,9 +253,7 @@ private:
if (itC != CountersMap.end()) {
auto& info = itC->second;
if (info.RunActorId == runActorId) {
- info.RunActorId = TActorId();
- info.CleanupDeadline = Now() + CLEANUP_PERIOD;
- Schedule(CLEANUP_PERIOD, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_CLEANUP));
+ Schedule(CLEANUP_PERIOD, new TEvPrivate::TEvCleanupCounters(queryId, runActorId));
}
}
}
@@ -254,11 +270,11 @@ private:
.Subscribe([actorSystem, selfId](const NThreading::TFuture<TGetTaskResult>& future) {
const auto& wrappedResult = future.GetValue();
if (wrappedResult.IsResultSet()) {
- actorSystem->Send(selfId, new TEvGetTaskInternalResponse(
+ actorSystem->Send(selfId, new TEvPrivate::TEvGetTaskInternalResponse(
wrappedResult.IsSuccess(), wrappedResult.GetIssues(), wrappedResult.GetResult())
);
} else {
- actorSystem->Send(selfId, new TEvGetTaskInternalResponse(
+ actorSystem->Send(selfId, new TEvPrivate::TEvGetTaskInternalResponse(
false, TIssues{{TIssue{"grpc private api result is not set for get task call"}}}, Yq::Private::GetTaskResult{})
);
}
@@ -345,15 +361,16 @@ private:
RunActorMap[runActorId] = params.QueryId;
if (!params.Automatic) {
- CountersMap[params.QueryId] = { rootCountersParent, publicCountersParent, runActorId, TInstant::Zero() };
+ CountersMap[params.QueryId] = { rootCountersParent, publicCountersParent, runActorId };
}
}
STRICT_STFUNC(StateFunc,
hFunc(NActors::TEvents::TEvWakeup, HandleWakeup)
HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered)
- hFunc(TEvGetTaskInternalResponse, HandleResponse)
+ hFunc(TEvPrivate::TEvGetTaskInternalResponse, HandleResponse)
hFunc(NActors::TEvents::TEvPoisonTaken, HandlePoisonTaken)
+ hFunc(TEvPrivate::TEvCleanupCounters, HandleCleanupCounters)
);
NYq::TYqSharedResources::TPtr YqSharedResources;
@@ -391,7 +408,6 @@ private:
NMonitoring::TDynamicCounterPtr RootCountersParent;
NMonitoring::TDynamicCounterPtr PublicCountersParent;
TActorId RunActorId;
- TInstant CleanupDeadline;
};
TMap<TString, TQueryCountersInfo> CountersMap;