diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-23 03:59:10 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-23 03:59:10 +0300 |
commit | bd77aa5b75631ae18c9dc30db2cb6e43bd904145 (patch) | |
tree | 548bc072ced209c6e4bed5684b32c023e2f4e64a | |
parent | 1701ff2aa8f104bebfe3510f86c2293ee145105c (diff) | |
download | ydb-bd77aa5b75631ae18c9dc30db2cb6e43bd904145.tar.gz |
YQ-859 Stabilize python tests
Fix counters cleanup race
Retry query creation with idempotency_key
Add sleep in tests for waiting
Add wait to retry configs
Fix test_state_load_mode
ref:86b77e9e5d95b0739536879897cad1e875b6af03
-rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 126 |
1 files changed, 71 insertions, 55 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index 5a7f71d9439..1568de4b74b 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -73,19 +73,45 @@ using namespace NYql; namespace { -struct TEvGetTaskInternalResponse : public NActors::TEventLocal<TEvGetTaskInternalResponse, NActors::TEvents::TSystem::Completed> { - bool Success = false; - const TIssues Issues; - const Yq::Private::GetTaskResult Result; - - TEvGetTaskInternalResponse( - bool success, - const TIssues& issues, - const Yq::Private::GetTaskResult& result) - : Success(success) - , Issues(issues) - , Result(result) - { } +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + EvGetTaskInternalResponse = EvBegin, + EvCleanupCounters, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + // Events + struct TEvGetTaskInternalResponse : public NActors::TEventLocal<TEvGetTaskInternalResponse, EvGetTaskInternalResponse> { + bool Success = false; + const TIssues Issues; + const Yq::Private::GetTaskResult Result; + + TEvGetTaskInternalResponse( + bool success, + const TIssues& issues, + const Yq::Private::GetTaskResult& result) + : Success(success) + , Issues(issues) + , Result(result) + { } + }; + + struct TEvCleanupCounters : public NActors::TEventLocal<TEvCleanupCounters, EvCleanupCounters> { + TEvCleanupCounters(const TString& queryId, const NActors::TActorId& runActorId) + : QueryId(queryId) + , RunActorId(runActorId) + { + } + + const TString QueryId; + const NActors::TActorId RunActorId; + }; }; template <class TElement> @@ -93,13 +119,10 @@ TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TEl return { field.begin(), field.end() }; } -} // namespace - -constexpr auto WAKEUP_TAG_FETCH = 1; -constexpr auto WAKEUP_TAG_CLEANUP = 2; - constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60); +} // namespace + class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> { public: TPendingFetcher( @@ -159,7 +182,7 @@ public: Y_UNUSED(ctx); DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory)); - Send(SelfId(), new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH)); + Send(SelfId(), new NActors::TEvents::TEvWakeup()); LOG_I("STARTED"); LogScope.ConstructInPlace(NActors::TActivationContext::ActorSystem(), NKikimrServices::YQL_PROXY, Guid); @@ -172,37 +195,32 @@ private: HasRunningRequest = false; } - void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) { - switch(ev->Get()->Tag) { - case WAKEUP_TAG_FETCH: - Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_FETCH)); - if (!HasRunningRequest) { - HasRunningRequest = true; - GetPendingTask(); - } - break; - case WAKEUP_TAG_CLEANUP: - auto now = Now(); - std::vector<TString> erased; - for (auto& [queryId, counters] : CountersMap) { - if (counters.RunActorId == TActorId() && counters.CleanupDeadline <= now) { - if (counters.RootCountersParent) { - counters.RootCountersParent->RemoveSubgroup("query_id", queryId); - } - if (counters.PublicCountersParent) { - counters.PublicCountersParent->RemoveSubgroup("query_id", queryId); - } - erased.push_back(queryId); - } - } - for (auto q : erased) { - CountersMap.erase(q); - } - break; + void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr&) { + Schedule(PendingFetchPeriod, new NActors::TEvents::TEvWakeup()); + if (!HasRunningRequest) { + HasRunningRequest = true; + GetPendingTask(); + } + } + + void HandleCleanupCounters(TEvPrivate::TEvCleanupCounters::TPtr& ev) { + const TString& queryId = ev->Get()->QueryId; + const auto countersIt = CountersMap.find(queryId); + if (countersIt == CountersMap.end() || countersIt->second.RunActorId != ev->Get()->RunActorId) { + return; + } + + auto& counters = countersIt->second; + if (counters.RootCountersParent) { + counters.RootCountersParent->RemoveSubgroup("query_id", queryId); + } + if (counters.PublicCountersParent) { + counters.PublicCountersParent->RemoveSubgroup("query_id", queryId); } + CountersMap.erase(countersIt); } - void HandleResponse(TEvGetTaskInternalResponse::TPtr& ev) { + void HandleResponse(TEvPrivate::TEvGetTaskInternalResponse::TPtr& ev) { HasRunningRequest = false; LOG_D("Got GetTask response from PrivateApi"); if (!ev->Get()->Success) { @@ -235,9 +253,7 @@ private: if (itC != CountersMap.end()) { auto& info = itC->second; if (info.RunActorId == runActorId) { - info.RunActorId = TActorId(); - info.CleanupDeadline = Now() + CLEANUP_PERIOD; - Schedule(CLEANUP_PERIOD, new NActors::TEvents::TEvWakeup(WAKEUP_TAG_CLEANUP)); + Schedule(CLEANUP_PERIOD, new TEvPrivate::TEvCleanupCounters(queryId, runActorId)); } } } @@ -254,11 +270,11 @@ private: .Subscribe([actorSystem, selfId](const NThreading::TFuture<TGetTaskResult>& future) { const auto& wrappedResult = future.GetValue(); if (wrappedResult.IsResultSet()) { - actorSystem->Send(selfId, new TEvGetTaskInternalResponse( + actorSystem->Send(selfId, new TEvPrivate::TEvGetTaskInternalResponse( wrappedResult.IsSuccess(), wrappedResult.GetIssues(), wrappedResult.GetResult()) ); } else { - actorSystem->Send(selfId, new TEvGetTaskInternalResponse( + actorSystem->Send(selfId, new TEvPrivate::TEvGetTaskInternalResponse( false, TIssues{{TIssue{"grpc private api result is not set for get task call"}}}, Yq::Private::GetTaskResult{}) ); } @@ -345,15 +361,16 @@ private: RunActorMap[runActorId] = params.QueryId; if (!params.Automatic) { - CountersMap[params.QueryId] = { rootCountersParent, publicCountersParent, runActorId, TInstant::Zero() }; + CountersMap[params.QueryId] = { rootCountersParent, publicCountersParent, runActorId }; } } STRICT_STFUNC(StateFunc, hFunc(NActors::TEvents::TEvWakeup, HandleWakeup) HFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) - hFunc(TEvGetTaskInternalResponse, HandleResponse) + hFunc(TEvPrivate::TEvGetTaskInternalResponse, HandleResponse) hFunc(NActors::TEvents::TEvPoisonTaken, HandlePoisonTaken) + hFunc(TEvPrivate::TEvCleanupCounters, HandleCleanupCounters) ); NYq::TYqSharedResources::TPtr YqSharedResources; @@ -391,7 +408,6 @@ private: NMonitoring::TDynamicCounterPtr RootCountersParent; NMonitoring::TDynamicCounterPtr PublicCountersParent; TActorId RunActorId; - TInstant CleanupDeadline; }; TMap<TString, TQueryCountersInfo> CountersMap; |