aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <d-mokhnatkin@ydb.tech>2022-08-19 22:26:50 +0300
committerd-mokhnatkin <d-mokhnatkin@ydb.tech>2022-08-19 22:26:50 +0300
commitfb6d9f890f34f3f4711b932f7e03eccbaba25bf5 (patch)
treee73e66dd3da32a3cc41feeffa0633e413877b20b
parent15aaae743152b9028f65881844fe03193b2ff764 (diff)
downloadydb-fb6d9f890f34f3f4711b932f7e03eccbaba25bf5.tar.gz
Try to fix s3 source actor fall on destroy
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h7
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h68
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp2
3 files changed, 44 insertions, 33 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 2757809c7c..d7a0a6cd5d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -78,8 +78,13 @@ struct IDqComputeActorAsyncInput {
virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; // Apply side effects related to this checkpoint.
virtual void LoadState(const NDqProto::TSourceState& state) = 0;
- virtual void PassAway() = 0; // The same signature as IActor::PassAway()
+ // The same signature as IActor::PassAway().
+ // It is guaranted that this method will be called with bound MKQL allocator.
+ // So, it is the right place to destroy all internal UnboxedValues.
+ virtual void PassAway() = 0;
+ // Do not destroy UnboxedValues inside destructor!!!
+ // It is called from actor system thread, and MKQL allocator is not bound in this case.
virtual ~IDqComputeActorAsyncInput() = default;
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 30da39c2d4..28c03341e5 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -498,51 +498,55 @@ protected:
Checkpoints->Receive(handle, NActors::TActivationContext::AsActorContext());
}
- for (auto& [_, source] : SourcesMap) {
- if (source.Actor) {
- source.AsyncInput->PassAway();
+ {
+ auto guard = MaybeBindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound
+
+ for (auto& [_, source] : SourcesMap) {
+ if (source.Actor) {
+ source.AsyncInput->PassAway();
+ }
}
- }
- for (auto& [_, transform] : InputTransformsMap) {
- if (transform.Actor) {
- transform.AsyncInput->PassAway();
+ for (auto& [_, transform] : InputTransformsMap) {
+ if (transform.Actor) {
+ transform.AsyncInput->PassAway();
+ }
}
- }
- for (auto& [_, sink] : SinksMap) {
- if (sink.Actor) {
- sink.AsyncOutput->PassAway();
+ for (auto& [_, sink] : SinksMap) {
+ if (sink.Actor) {
+ sink.AsyncOutput->PassAway();
+ }
}
- }
- for (auto& [_, transform] : OutputTransformsMap) {
- if (transform.Actor) {
- transform.AsyncOutput->PassAway();
+ for (auto& [_, transform] : OutputTransformsMap) {
+ if (transform.Actor) {
+ transform.AsyncOutput->PassAway();
+ }
}
- }
- for (auto& [_, outputChannel] : OutputChannelsMap) {
- if (outputChannel.Channel) {
- outputChannel.Channel->Terminate();
+ for (auto& [_, outputChannel] : OutputChannelsMap) {
+ if (outputChannel.Channel) {
+ outputChannel.Channel->Terminate();
+ }
}
- }
- if (RuntimeSettings.TerminateHandler) {
- RuntimeSettings.TerminateHandler(success, issues);
- }
+ if (RuntimeSettings.TerminateHandler) {
+ RuntimeSettings.TerminateHandler(success, issues);
+ }
- {
- // free MKQL memory then destroy TaskRunner and Allocator
- if (auto guard = MaybeBindAllocator()) {
+ {
+ if (guard) {
+ // free MKQL memory then destroy TaskRunner and Allocator
#define CLEANUP(what) decltype(what) what##_; what.swap(what##_);
- CLEANUP(InputChannelsMap);
- CLEANUP(SourcesMap);
- CLEANUP(InputTransformsMap);
- CLEANUP(OutputChannelsMap);
- CLEANUP(SinksMap);
- CLEANUP(OutputTransformsMap);
+ CLEANUP(InputChannelsMap);
+ CLEANUP(SourcesMap);
+ CLEANUP(InputTransformsMap);
+ CLEANUP(OutputChannelsMap);
+ CLEANUP(SinksMap);
+ CLEANUP(OutputTransformsMap);
#undef CLEANUP
+ }
}
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 144289746f..00701adfd0 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -221,6 +221,7 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
+ ContainerCache.Clear();
TActorBootstrapped<TS3ReadActor>::PassAway();
}
@@ -570,6 +571,7 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
+ ContainerCache.Clear();
TActorBootstrapped<TS3StreamReadActor>::PassAway();
}