diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-03-24 15:15:14 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@yandex-team.ru> | 2022-03-24 15:15:14 +0300 |
commit | 7242975d1ea33c33921a365827be7ed0fc1a1ef5 (patch) | |
tree | 8432c5257376e9cfb38d050842a6f98afcf84b5d | |
parent | 8f435f2d7ebc63bc91e00867bb196fd369329f54 (diff) | |
download | ydb-7242975d1ea33c33921a365827be7ed0fc1a1ef5.tar.gz |
YQL-14319: Execute PassAway once
ref:9deb66e95f87842cd41a2be16fd1336a89066bc3
-rw-r--r-- | ydb/library/yql/providers/dq/actors/actor_helpers.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/result_actor_base.h | 22 | ||||
-rw-r--r-- | ydb/library/yql/utils/actors/rich_actor.h | 4 |
3 files changed, 17 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/dq/actors/actor_helpers.h b/ydb/library/yql/providers/dq/actors/actor_helpers.h index b92603ab0f..3a2c52f926 100644 --- a/ydb/library/yql/providers/dq/actors/actor_helpers.h +++ b/ydb/library/yql/providers/dq/actors/actor_helpers.h @@ -131,6 +131,7 @@ private: ESyncState SyncState_{E_IDLE}; ui32 ExpectedEventType_{0}; THashSet<ui32> CriticalEventTypes_{}; + bool Killed = false; void SyncHandler(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) { const ui32 etype = ev->GetTypeRewrite(); @@ -153,7 +154,7 @@ private: } SyncCallback_ = nullptr; - while (!DelayedEvents_.empty()) { + while (!DelayedEvents_.empty() && !Killed) { auto event = std::move(DelayedEvents_.front()); DelayedEvents_.pop_front(); InterruptedHandler_ = TBase::CurrentStateFunc(); diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index 4269c280d7..2ad0e2065e 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -21,12 +21,12 @@ namespace NYql::NDqs::NExecutionHelpers { using TBase = NYql::TSynchronizableRichActor<TDerived>; TResultActorBase( - const TVector<TString>& columns, - const NActors::TActorId& executerId, - const TString& traceId, - const TDqConfiguration::TPtr& settings, - const TString& resultType, - NActors::TActorId graphExecutionEventsId, + const TVector<TString>& columns, + const NActors::TActorId& executerId, + const TString& traceId, + const TDqConfiguration::TPtr& settings, + const TString& resultType, + NActors::TActorId graphExecutionEventsId, bool discard) : TBase(&TDerived::Handler) , ExecuterID(executerId) @@ -39,8 +39,8 @@ namespace NYql::NDqs::NExecutionHelpers { , Discard(discard) , WriteQueue() , SizeLimit( - (Settings && Settings->_AllResultsBytesLimit.Get().Defined()) - ? Settings->_AllResultsBytesLimit.Get().GetRef() + (Settings && Settings->_AllResultsBytesLimit.Get().Defined()) + ? Settings->_AllResultsBytesLimit.Get().GetRef() : 64000000) // GRPC limit , RowsLimit(settings ? Settings->_RowsLimitPerWrite.Get() : Nothing()) , Rows(0) @@ -240,10 +240,10 @@ namespace NYql::NDqs::NExecutionHelpers { } UnsafeWriteToFullResultTable(); - } + } void OnErrorInShutdownState(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { - // FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse + // FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse TIssues issues; IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); Issues.AddIssues(issues); @@ -337,7 +337,7 @@ namespace NYql::NDqs::NExecutionHelpers { private: struct TQueueItem { - TQueueItem(NDqProto::TData&& data, const TString& messageId) + TQueueItem(NDqProto::TData&& data, const TString& messageId) : WriteRequest() , MessageId(messageId) , SentProcessedEvent(false) { diff --git a/ydb/library/yql/utils/actors/rich_actor.h b/ydb/library/yql/utils/actors/rich_actor.h index a0f2ddd94b..8c506a6454 100644 --- a/ydb/library/yql/utils/actors/rich_actor.h +++ b/ydb/library/yql/utils/actors/rich_actor.h @@ -25,6 +25,7 @@ public: CleanupChildren(); UnsubscribeAll(); NActors::IActor::PassAway(); + Killed = true; } void CleanupChildren() { @@ -89,6 +90,9 @@ public: Subscriptions.insert(nodeId); } +protected: + bool Killed = false; + private: THashMap<NActors::TActorId, NActors::IEventBase*> Children; THashSet<ui32> Subscriptions; |