aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-07-01 22:51:21 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-07-01 22:51:21 +0300
commit688f68d8cbe0e69bfd8776f5a54c089923f38068 (patch)
tree3f0505f5ffdcbea6406df01ba8d1b24eb16cb128
parent9172a70c81c471c9706493ffab67979dd2dc9f1b (diff)
downloadydb-688f68d8cbe0e69bfd8776f5a54c089923f38068.tar.gz
YQ-1204 Catch exceptions during creation of async io actors
Catch exceptions during creation of async io actors ref:f46ab44e9b1ad03e5281b1c6ed1d704e976a42cd
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h108
1 files changed, 62 insertions, 46 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index d2599525009..5cd9474552d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1323,17 +1323,21 @@ protected:
const auto& inputDesc = Task.GetInputs(inputIndex);
const ui64 i = inputIndex; // Crutch for clang
CA_LOG_D("Create source for input " << i << " " << inputDesc);
- std::tie(source.AsyncInput, source.Actor) = AsyncIoFactory->CreateDqSource(
- IDqAsyncIoFactory::TSourceArguments {
- .InputDesc = inputDesc,
- .InputIndex = inputIndex,
- .TxId = TxId,
- .SecureParams = secureParams,
- .TaskParams = taskParams,
- .ComputeActorId = this->SelfId(),
- .TypeEnv = typeEnv,
- .HolderFactory = holderFactory
- });
+ try {
+ std::tie(source.AsyncInput, source.Actor) = AsyncIoFactory->CreateDqSource(
+ IDqAsyncIoFactory::TSourceArguments {
+ .InputDesc = inputDesc,
+ .InputIndex = inputIndex,
+ .TxId = TxId,
+ .SecureParams = secureParams,
+ .TaskParams = taskParams,
+ .ComputeActorId = this->SelfId(),
+ .TypeEnv = typeEnv,
+ .HolderFactory = holderFactory
+ });
+ } catch (const std::exception& ex) {
+ throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();
+ }
this->RegisterWithSameMailbox(source.Actor);
}
for (auto& [inputIndex, transform] : InputTransformsMap) {
@@ -1344,19 +1348,23 @@ protected:
const auto& inputDesc = Task.GetInputs(inputIndex);
const ui64 i = inputIndex; // Crutch for clang
CA_LOG_D("Create transform for input " << i << " " << inputDesc.ShortDebugString());
- std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform(
- IDqAsyncIoFactory::TInputTransformArguments {
- .InputDesc = inputDesc,
- .InputIndex = inputIndex,
- .TxId = TxId,
- .TransformInput = transform.InputBuffer,
- .SecureParams = secureParams,
- .TaskParams = taskParams,
- .ComputeActorId = this->SelfId(),
- .TypeEnv = typeEnv,
- .HolderFactory = holderFactory,
- .ProgramBuilder = *transform.ProgramBuilder
- });
+ try {
+ std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform(
+ IDqAsyncIoFactory::TInputTransformArguments {
+ .InputDesc = inputDesc,
+ .InputIndex = inputIndex,
+ .TxId = TxId,
+ .TransformInput = transform.InputBuffer,
+ .SecureParams = secureParams,
+ .TaskParams = taskParams,
+ .ComputeActorId = this->SelfId(),
+ .TypeEnv = typeEnv,
+ .HolderFactory = holderFactory,
+ .ProgramBuilder = *transform.ProgramBuilder
+ });
+ } catch (const std::exception& ex) {
+ throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();
+ }
this->RegisterWithSameMailbox(transform.Actor);
}
}
@@ -1373,18 +1381,22 @@ protected:
const auto& outputDesc = Task.GetOutputs(outputIndex);
const ui64 i = outputIndex; // Crutch for clang
CA_LOG_D("Create transform for output " << i << " " << outputDesc.ShortDebugString());
- std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform(
- IDqAsyncIoFactory::TOutputTransformArguments {
- .OutputDesc = outputDesc,
- .OutputIndex = outputIndex,
- .TxId = TxId,
- .TransformOutput = transform.OutputBuffer,
- .Callback = static_cast<TOutputTransformCallbacks*>(this),
- .SecureParams = secureParams,
- .TypeEnv = typeEnv,
- .HolderFactory = holderFactory,
- .ProgramBuilder = *transform.ProgramBuilder
- });
+ try {
+ std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform(
+ IDqAsyncIoFactory::TOutputTransformArguments {
+ .OutputDesc = outputDesc,
+ .OutputIndex = outputIndex,
+ .TxId = TxId,
+ .TransformOutput = transform.OutputBuffer,
+ .Callback = static_cast<TOutputTransformCallbacks*>(this),
+ .SecureParams = secureParams,
+ .TypeEnv = typeEnv,
+ .HolderFactory = holderFactory,
+ .ProgramBuilder = *transform.ProgramBuilder
+ });
+ } catch (const std::exception& ex) {
+ throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what();
+ }
this->RegisterWithSameMailbox(transform.Actor);
}
}
@@ -1394,16 +1406,20 @@ protected:
const auto& outputDesc = Task.GetOutputs(outputIndex);
const ui64 i = outputIndex; // Crutch for clang
CA_LOG_D("Create sink for output " << i << " " << outputDesc);
- std::tie(sink.AsyncOutput, sink.Actor) = AsyncIoFactory->CreateDqSink(
- IDqAsyncIoFactory::TSinkArguments {
- .OutputDesc = outputDesc,
- .OutputIndex = outputIndex,
- .TxId = TxId,
- .Callback = static_cast<TSinkCallbacks*>(this),
- .SecureParams = secureParams,
- .TypeEnv = typeEnv,
- .HolderFactory = holderFactory
- });
+ try {
+ std::tie(sink.AsyncOutput, sink.Actor) = AsyncIoFactory->CreateDqSink(
+ IDqAsyncIoFactory::TSinkArguments {
+ .OutputDesc = outputDesc,
+ .OutputIndex = outputIndex,
+ .TxId = TxId,
+ .Callback = static_cast<TSinkCallbacks*>(this),
+ .SecureParams = secureParams,
+ .TypeEnv = typeEnv,
+ .HolderFactory = holderFactory
+ });
+ } catch (const std::exception& ex) {
+ throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what();
+ }
this->RegisterWithSameMailbox(sink.Actor);
}
}