diff options
author | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-08-19 22:26:50 +0300 |
---|---|---|
committer | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-08-19 22:26:50 +0300 |
commit | fb6d9f890f34f3f4711b932f7e03eccbaba25bf5 (patch) | |
tree | e73e66dd3da32a3cc41feeffa0633e413877b20b | |
parent | 15aaae743152b9028f65881844fe03193b2ff764 (diff) | |
download | ydb-fb6d9f890f34f3f4711b932f7e03eccbaba25bf5.tar.gz |
Try to fix s3 source actor fall on destroy
3 files changed, 44 insertions, 33 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 2757809c7c..d7a0a6cd5d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -78,8 +78,13 @@ struct IDqComputeActorAsyncInput { virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; // Apply side effects related to this checkpoint. virtual void LoadState(const NDqProto::TSourceState& state) = 0; - virtual void PassAway() = 0; // The same signature as IActor::PassAway() + // The same signature as IActor::PassAway(). + // It is guaranted that this method will be called with bound MKQL allocator. + // So, it is the right place to destroy all internal UnboxedValues. + virtual void PassAway() = 0; + // Do not destroy UnboxedValues inside destructor!!! + // It is called from actor system thread, and MKQL allocator is not bound in this case. virtual ~IDqComputeActorAsyncInput() = default; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 30da39c2d4..28c03341e5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -498,51 +498,55 @@ protected: Checkpoints->Receive(handle, NActors::TActivationContext::AsActorContext()); } - for (auto& [_, source] : SourcesMap) { - if (source.Actor) { - source.AsyncInput->PassAway(); + { + auto guard = MaybeBindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound + + for (auto& [_, source] : SourcesMap) { + if (source.Actor) { + source.AsyncInput->PassAway(); + } } - } - for (auto& [_, transform] : InputTransformsMap) { - if (transform.Actor) { - transform.AsyncInput->PassAway(); + for (auto& [_, transform] : InputTransformsMap) { + if (transform.Actor) { + transform.AsyncInput->PassAway(); + } } - } - for (auto& [_, sink] : SinksMap) { - if (sink.Actor) { - sink.AsyncOutput->PassAway(); + for (auto& [_, sink] : SinksMap) { + if (sink.Actor) { + sink.AsyncOutput->PassAway(); + } } - } - for (auto& [_, transform] : OutputTransformsMap) { - if (transform.Actor) { - transform.AsyncOutput->PassAway(); + for (auto& [_, transform] : OutputTransformsMap) { + if (transform.Actor) { + transform.AsyncOutput->PassAway(); + } } - } - for (auto& [_, outputChannel] : OutputChannelsMap) { - if (outputChannel.Channel) { - outputChannel.Channel->Terminate(); + for (auto& [_, outputChannel] : OutputChannelsMap) { + if (outputChannel.Channel) { + outputChannel.Channel->Terminate(); + } } - } - if (RuntimeSettings.TerminateHandler) { - RuntimeSettings.TerminateHandler(success, issues); - } + if (RuntimeSettings.TerminateHandler) { + RuntimeSettings.TerminateHandler(success, issues); + } - { - // free MKQL memory then destroy TaskRunner and Allocator - if (auto guard = MaybeBindAllocator()) { + { + if (guard) { + // free MKQL memory then destroy TaskRunner and Allocator #define CLEANUP(what) decltype(what) what##_; what.swap(what##_); - CLEANUP(InputChannelsMap); - CLEANUP(SourcesMap); - CLEANUP(InputTransformsMap); - CLEANUP(OutputChannelsMap); - CLEANUP(SinksMap); - CLEANUP(OutputTransformsMap); + CLEANUP(InputChannelsMap); + CLEANUP(SourcesMap); + CLEANUP(InputTransformsMap); + CLEANUP(OutputChannelsMap); + CLEANUP(SinksMap); + CLEANUP(OutputTransformsMap); #undef CLEANUP + } } } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 144289746f..00701adfd0 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -221,6 +221,7 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor + ContainerCache.Clear(); TActorBootstrapped<TS3ReadActor>::PassAway(); } @@ -570,6 +571,7 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor + ContainerCache.Clear(); TActorBootstrapped<TS3StreamReadActor>::PassAway(); } |