aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrlolthe1st <mrlolthe1st@ydb.tech>2023-12-11 14:34:16 +0300
committermrlolthe1st <mrlolthe1st@ydb.tech>2023-12-11 15:38:19 +0300
commit79565d2a316d0dae15f83bcb9719afa0eea788c2 (patch)
treea309282ffbf23f64b342dd46ae051b50ff4938f9
parent7d60b01fe2769f3f14b9ea385c09e7bbbbc5bd6a (diff)
downloadydb-79565d2a316d0dae15f83bcb9719afa0eea788c2.tar.gz
YQL-17256: Fallback from RPC Reader
YQL-17256: Fallback from RPC Reader
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp2
-rw-r--r--ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp6
-rw-r--r--ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp1
3 files changed, 7 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 611e4d02c0..a8285d9558 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -111,7 +111,7 @@ private:
} else if (line.Contains("No such transaction")) {
// YQL-15542
fallback = true;
- } else if (line.Contains("(NYT::TErrorException) Request timed out")) {
+ } else if (line.Contains("YT RPC Reader exception:")) {
// RPC reader fallback to YT
fallback = true;
} else if (line.Contains("Transaction") && line.Contains("aborted")) {
diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp
index 57b98f4824..5601687308 100644
--- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp
+++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp
@@ -116,7 +116,11 @@ std::unique_ptr<TSettingsHolder> CreateInputStreams(bool isArrow, const TString&
}))));
}
TVector<NYT::NConcurrency::IAsyncZeroCopyInputStreamPtr> rawInputs;
- NYT::NConcurrency::WaitFor(NYT::AllSucceeded(waitFor)).ValueOrThrow().swap(rawInputs);
+ auto result = NYT::NConcurrency::WaitFor(NYT::AllSucceeded(waitFor));
+ if (!result.IsOK()) {
+ Cerr << "YT RPC Reader exception:\n";
+ }
+ result.ValueOrThrow().swap(rawInputs);
return std::make_unique<TSettingsHolder>(std::move(connection), std::move(client), std::move(rawInputs), std::move(originalIndexes));
}
diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp
index 71dc3bc453..dd7a447540 100644
--- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp
+++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp
@@ -187,6 +187,7 @@ bool TParallelFileInputState::NextValue() {
{
std::lock_guard lock(InnerState_->Lock);
if (!InnerState_->Error.IsOK()) {
+ Cerr << "YT RPC Reader exception:\n";
InnerState_->Error.ThrowOnError();
}
if (InnerState_->Results.empty()) {