diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-07-01 22:51:21 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-07-01 22:51:21 +0300 |
commit | 688f68d8cbe0e69bfd8776f5a54c089923f38068 (patch) | |
tree | 3f0505f5ffdcbea6406df01ba8d1b24eb16cb128 | |
parent | 9172a70c81c471c9706493ffab67979dd2dc9f1b (diff) | |
download | ydb-688f68d8cbe0e69bfd8776f5a54c089923f38068.tar.gz |
YQ-1204 Catch exceptions during creation of async io actors
Catch exceptions during creation of async io actors
ref:f46ab44e9b1ad03e5281b1c6ed1d704e976a42cd
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 108 |
1 files changed, 62 insertions, 46 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index d2599525009..5cd9474552d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1323,17 +1323,21 @@ protected: const auto& inputDesc = Task.GetInputs(inputIndex); const ui64 i = inputIndex; // Crutch for clang CA_LOG_D("Create source for input " << i << " " << inputDesc); - std::tie(source.AsyncInput, source.Actor) = AsyncIoFactory->CreateDqSource( - IDqAsyncIoFactory::TSourceArguments { - .InputDesc = inputDesc, - .InputIndex = inputIndex, - .TxId = TxId, - .SecureParams = secureParams, - .TaskParams = taskParams, - .ComputeActorId = this->SelfId(), - .TypeEnv = typeEnv, - .HolderFactory = holderFactory - }); + try { + std::tie(source.AsyncInput, source.Actor) = AsyncIoFactory->CreateDqSource( + IDqAsyncIoFactory::TSourceArguments { + .InputDesc = inputDesc, + .InputIndex = inputIndex, + .TxId = TxId, + .SecureParams = secureParams, + .TaskParams = taskParams, + .ComputeActorId = this->SelfId(), + .TypeEnv = typeEnv, + .HolderFactory = holderFactory + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what(); + } this->RegisterWithSameMailbox(source.Actor); } for (auto& [inputIndex, transform] : InputTransformsMap) { @@ -1344,19 +1348,23 @@ protected: const auto& inputDesc = Task.GetInputs(inputIndex); const ui64 i = inputIndex; // Crutch for clang CA_LOG_D("Create transform for input " << i << " " << inputDesc.ShortDebugString()); - std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform( - IDqAsyncIoFactory::TInputTransformArguments { - .InputDesc = inputDesc, - .InputIndex = inputIndex, - .TxId = TxId, - .TransformInput = transform.InputBuffer, - .SecureParams = secureParams, - .TaskParams = taskParams, - .ComputeActorId = this->SelfId(), - .TypeEnv = typeEnv, - .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder - }); + try { + std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform( + IDqAsyncIoFactory::TInputTransformArguments { + .InputDesc = inputDesc, + .InputIndex = inputIndex, + .TxId = TxId, + .TransformInput = transform.InputBuffer, + .SecureParams = secureParams, + .TaskParams = taskParams, + .ComputeActorId = this->SelfId(), + .TypeEnv = typeEnv, + .HolderFactory = holderFactory, + .ProgramBuilder = *transform.ProgramBuilder + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); + } this->RegisterWithSameMailbox(transform.Actor); } } @@ -1373,18 +1381,22 @@ protected: const auto& outputDesc = Task.GetOutputs(outputIndex); const ui64 i = outputIndex; // Crutch for clang CA_LOG_D("Create transform for output " << i << " " << outputDesc.ShortDebugString()); - std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( - IDqAsyncIoFactory::TOutputTransformArguments { - .OutputDesc = outputDesc, - .OutputIndex = outputIndex, - .TxId = TxId, - .TransformOutput = transform.OutputBuffer, - .Callback = static_cast<TOutputTransformCallbacks*>(this), - .SecureParams = secureParams, - .TypeEnv = typeEnv, - .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder - }); + try { + std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( + IDqAsyncIoFactory::TOutputTransformArguments { + .OutputDesc = outputDesc, + .OutputIndex = outputIndex, + .TxId = TxId, + .TransformOutput = transform.OutputBuffer, + .Callback = static_cast<TOutputTransformCallbacks*>(this), + .SecureParams = secureParams, + .TypeEnv = typeEnv, + .HolderFactory = holderFactory, + .ProgramBuilder = *transform.ProgramBuilder + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what(); + } this->RegisterWithSameMailbox(transform.Actor); } } @@ -1394,16 +1406,20 @@ protected: const auto& outputDesc = Task.GetOutputs(outputIndex); const ui64 i = outputIndex; // Crutch for clang CA_LOG_D("Create sink for output " << i << " " << outputDesc); - std::tie(sink.AsyncOutput, sink.Actor) = AsyncIoFactory->CreateDqSink( - IDqAsyncIoFactory::TSinkArguments { - .OutputDesc = outputDesc, - .OutputIndex = outputIndex, - .TxId = TxId, - .Callback = static_cast<TSinkCallbacks*>(this), - .SecureParams = secureParams, - .TypeEnv = typeEnv, - .HolderFactory = holderFactory - }); + try { + std::tie(sink.AsyncOutput, sink.Actor) = AsyncIoFactory->CreateDqSink( + IDqAsyncIoFactory::TSinkArguments { + .OutputDesc = outputDesc, + .OutputIndex = outputIndex, + .TxId = TxId, + .Callback = static_cast<TSinkCallbacks*>(this), + .SecureParams = secureParams, + .TypeEnv = typeEnv, + .HolderFactory = holderFactory + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what(); + } this->RegisterWithSameMailbox(sink.Actor); } } |