aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-03-24 15:15:14 +0300
committeraozeritsky <aozeritsky@yandex-team.ru>2022-03-24 15:15:14 +0300
commit7242975d1ea33c33921a365827be7ed0fc1a1ef5 (patch)
tree8432c5257376e9cfb38d050842a6f98afcf84b5d
parent8f435f2d7ebc63bc91e00867bb196fd369329f54 (diff)
downloadydb-7242975d1ea33c33921a365827be7ed0fc1a1ef5.tar.gz
YQL-14319: Execute PassAway once
ref:9deb66e95f87842cd41a2be16fd1336a89066bc3
-rw-r--r--ydb/library/yql/providers/dq/actors/actor_helpers.h3
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h22
-rw-r--r--ydb/library/yql/utils/actors/rich_actor.h4
3 files changed, 17 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/dq/actors/actor_helpers.h b/ydb/library/yql/providers/dq/actors/actor_helpers.h
index b92603ab0f..3a2c52f926 100644
--- a/ydb/library/yql/providers/dq/actors/actor_helpers.h
+++ b/ydb/library/yql/providers/dq/actors/actor_helpers.h
@@ -131,6 +131,7 @@ private:
ESyncState SyncState_{E_IDLE};
ui32 ExpectedEventType_{0};
THashSet<ui32> CriticalEventTypes_{};
+ bool Killed = false;
void SyncHandler(TAutoPtr<NActors::IEventHandle>& ev, const NActors::TActorContext& ctx) {
const ui32 etype = ev->GetTypeRewrite();
@@ -153,7 +154,7 @@ private:
}
SyncCallback_ = nullptr;
- while (!DelayedEvents_.empty()) {
+ while (!DelayedEvents_.empty() && !Killed) {
auto event = std::move(DelayedEvents_.front());
DelayedEvents_.pop_front();
InterruptedHandler_ = TBase::CurrentStateFunc();
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
index 4269c280d7..2ad0e2065e 100644
--- a/ydb/library/yql/providers/dq/actors/result_actor_base.h
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -21,12 +21,12 @@ namespace NYql::NDqs::NExecutionHelpers {
using TBase = NYql::TSynchronizableRichActor<TDerived>;
TResultActorBase(
- const TVector<TString>& columns,
- const NActors::TActorId& executerId,
- const TString& traceId,
- const TDqConfiguration::TPtr& settings,
- const TString& resultType,
- NActors::TActorId graphExecutionEventsId,
+ const TVector<TString>& columns,
+ const NActors::TActorId& executerId,
+ const TString& traceId,
+ const TDqConfiguration::TPtr& settings,
+ const TString& resultType,
+ NActors::TActorId graphExecutionEventsId,
bool discard)
: TBase(&TDerived::Handler)
, ExecuterID(executerId)
@@ -39,8 +39,8 @@ namespace NYql::NDqs::NExecutionHelpers {
, Discard(discard)
, WriteQueue()
, SizeLimit(
- (Settings && Settings->_AllResultsBytesLimit.Get().Defined())
- ? Settings->_AllResultsBytesLimit.Get().GetRef()
+ (Settings && Settings->_AllResultsBytesLimit.Get().Defined())
+ ? Settings->_AllResultsBytesLimit.Get().GetRef()
: 64000000) // GRPC limit
, RowsLimit(settings ? Settings->_RowsLimitPerWrite.Get() : Nothing())
, Rows(0)
@@ -240,10 +240,10 @@ namespace NYql::NDqs::NExecutionHelpers {
}
UnsafeWriteToFullResultTable();
- }
+ }
void OnErrorInShutdownState(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
- // FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse
+ // FullResultWriter will always send TEvGone after this, so these issues will be passed to executor with TEvQueryResponse
TIssues issues;
IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
Issues.AddIssues(issues);
@@ -337,7 +337,7 @@ namespace NYql::NDqs::NExecutionHelpers {
private:
struct TQueueItem {
- TQueueItem(NDqProto::TData&& data, const TString& messageId)
+ TQueueItem(NDqProto::TData&& data, const TString& messageId)
: WriteRequest()
, MessageId(messageId)
, SentProcessedEvent(false) {
diff --git a/ydb/library/yql/utils/actors/rich_actor.h b/ydb/library/yql/utils/actors/rich_actor.h
index a0f2ddd94b..8c506a6454 100644
--- a/ydb/library/yql/utils/actors/rich_actor.h
+++ b/ydb/library/yql/utils/actors/rich_actor.h
@@ -25,6 +25,7 @@ public:
CleanupChildren();
UnsubscribeAll();
NActors::IActor::PassAway();
+ Killed = true;
}
void CleanupChildren() {
@@ -89,6 +90,9 @@ public:
Subscriptions.insert(nodeId);
}
+protected:
+ bool Killed = false;
+
private:
THashMap<NActors::TActorId, NActors::IEventBase*> Children;
THashSet<ui32> Subscriptions;