aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-11-02 13:24:28 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-11-02 13:24:28 +0300
commitf52533dff57d75a49beeb8f07c7633155b420074 (patch)
treef4d10abac9b61a263e9951be49f5171ade0f2c66
parent0fe86c6b4a1a50ad5fc39b3d0d4d86db0633d51c (diff)
downloadydb-f52533dff57d75a49beeb8f07c7633155b420074.tar.gz
fix
-rw-r--r--ydb/library/yql/core/yql_graph_transformer.h6
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp11
2 files changed, 11 insertions, 6 deletions
diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h
index 60d87727dd8..ba89799f2f5 100644
--- a/ydb/library/yql/core/yql_graph_transformer.h
+++ b/ydb/library/yql/core/yql_graph_transformer.h
@@ -320,7 +320,7 @@ private:
TNodeMap<TAsyncTransformCallbackFuture> Callbacks;
};
-template <typename TFuture, typename TCallback>
+template <bool RaiseIssues = true, typename TFuture, typename TCallback>
std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture>
WrapFutureCallback(const TFuture& future, const TCallback& callback, const TString& message = "") {
return std::make_pair(IGraphTransformer::TStatus::Async, future.Apply(
@@ -338,7 +338,9 @@ WrapFutureCallback(const TFuture& future, const TCallback& callback, const TStri
? TStringBuilder() << "Execution of node: " << input->Content()
: message);
});
- res.ReportIssues(ctx.IssueManager);
+
+ if constexpr (RaiseIssues)
+ res.ReportIssues(ctx.IssueManager);
if (!res.Success()) {
input->SetState(TExprNode::EState::Error);
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index bc505f924ad..6ee4e054cbe 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -806,7 +806,7 @@ private:
FlushStatisticsToState();
- return WrapFutureCallback(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ return WrapFutureCallback<false>(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback";
auto duration = TInstant::Now() - startTime;
@@ -839,7 +839,8 @@ private:
state->Metrics->IncCounter("dq", "ForceFallback");
}
return FallbackCallback(state, input, output, ctx);
- }
+ } else
+ res.ReportIssues(ctx.IssueManager);
output = input;
input->SetState(TExprNode::EState::ExecutionComplete);
@@ -1161,7 +1162,7 @@ private:
int level = 0;
// TODO: remove copy-paste
- return WrapFutureCallback(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
auto duration = TInstant::Now() - startTime;
if (state->Metrics) {
state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds());
@@ -1182,9 +1183,11 @@ private:
state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
// never fallback will be captured in yql_facade
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
+ issues.AddIssues(res.Issues());
ctx.AssociativeIssues.emplace(input.Get(), std::move(issues));
return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
- }
+ } else
+ res.ReportIssues(ctx.IssueManager);
output = input;
input->SetState(TExprNode::EState::ExecutionComplete);