aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/core
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.com>2024-11-12 22:07:14 +0300
committerMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-11-12 22:40:29 +0300
commit22ba6e9ad67a7b2bbfd1c634efd136112f3e78c3 (patch)
tree92ebdc7307496e53f65016369f9177dc24efd35e /yql/essentials/core
parent3c382c6d983d093ae974db36bf2c91616c317bc8 (diff)
downloadydb-22ba6e9ad67a7b2bbfd1c634efd136112f3e78c3.tar.gz
Apply GH commits
Apply GH: Stop wide combiner state from growing unlimited (#10997) Apply GH: Reconnect session has been supported (#9862) Apply GH: Handle unexpected future exception (#11091) commit_hash:c2597abeae3a153692a80cc13446423769765ae1
Diffstat (limited to 'yql/essentials/core')
-rw-r--r--yql/essentials/core/facade/yql_facade.cpp4
-rw-r--r--yql/essentials/core/yql_execution.cpp4
-rw-r--r--yql/essentials/core/yql_graph_transformer.cpp4
-rw-r--r--yql/essentials/core/yql_graph_transformer.h11
4 files changed, 17 insertions, 6 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp
index 59cfeab848..94309e2148 100644
--- a/yql/essentials/core/facade/yql_facade.cpp
+++ b/yql/essentials/core/facade/yql_facade.cpp
@@ -80,13 +80,13 @@ TProgram::TStatus SyncExecution(
(program->*method)(std::forward<Params2>(params)...);
YQL_ENSURE(future.Initialized());
future.Wait();
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
TProgram::TStatus status = future.GetValue();
while (status == TProgram::TStatus::Async) {
auto continueFuture = program->ContinueAsync();
continueFuture.Wait();
- YQL_ENSURE(!continueFuture.HasException());
+ HandleFutureException(continueFuture);
status = continueFuture.GetValue();
}
diff --git a/yql/essentials/core/yql_execution.cpp b/yql/essentials/core/yql_execution.cpp
index 2fe21e1535..5b34866487 100644
--- a/yql/essentials/core/yql_execution.cpp
+++ b/yql/essentials/core/yql_execution.cpp
@@ -525,12 +525,12 @@ public:
if (DeterministicMode) {
future.Subscribe([state](const NThreading::TFuture<void>& future) {
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
ProcessFutureResultQueue(state);
});
} else {
future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) {
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
TAutoPtr<TState::TItem> item = new TState::TItem;
item->Node = node; item->DataProvider = dataProvider;
diff --git a/yql/essentials/core/yql_graph_transformer.cpp b/yql/essentials/core/yql_graph_transformer.cpp
index 105e11846a..5248ee1597 100644
--- a/yql/essentials/core/yql_graph_transformer.cpp
+++ b/yql/essentials/core/yql_graph_transformer.cpp
@@ -249,7 +249,7 @@ IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNo
auto future = transformer.GetAsyncFuture(*root);
future.Wait();
- YQL_ENSURE(!future.HasException());
+ HandleFutureException(future);
status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
if (newRoot) {
@@ -377,7 +377,7 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
NThreading::TFuture<IGraphTransformer::TStatus> status = AsyncTransform(transformer, root, ctx, applyAsyncChanges);
status.Subscribe(
[asyncCallback](const NThreading::TFuture<IGraphTransformer::TStatus>& status) mutable -> void {
- YQL_ENSURE(!status.HasException());
+ HandleFutureException(status);
asyncCallback(status.GetValue());
});
}
diff --git a/yql/essentials/core/yql_graph_transformer.h b/yql/essentials/core/yql_graph_transformer.h
index 30ddc49ad5..d216761589 100644
--- a/yql/essentials/core/yql_graph_transformer.h
+++ b/yql/essentials/core/yql_graph_transformer.h
@@ -236,6 +236,17 @@ void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExpr
IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
TExprContext& ctx, bool applyAsyncChanges);
+template <typename T>
+void HandleFutureException(const NThreading::TFuture<T>& future) {
+ if (future.HasException()) {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage();
+ }
+ }
+}
+
class TSyncTransformerBase : public TGraphTransformerBase {
public:
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {