diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-11-02 13:24:28 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-11-02 13:24:28 +0300 |
commit | f52533dff57d75a49beeb8f07c7633155b420074 (patch) | |
tree | f4d10abac9b61a263e9951be49f5171ade0f2c66 | |
parent | 0fe86c6b4a1a50ad5fc39b3d0d4d86db0633d51c (diff) | |
download | ydb-f52533dff57d75a49beeb8f07c7633155b420074.tar.gz |
fix
-rw-r--r-- | ydb/library/yql/core/yql_graph_transformer.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 11 |
2 files changed, 11 insertions, 6 deletions
diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h index 60d87727dd8..ba89799f2f5 100644 --- a/ydb/library/yql/core/yql_graph_transformer.h +++ b/ydb/library/yql/core/yql_graph_transformer.h @@ -320,7 +320,7 @@ private: TNodeMap<TAsyncTransformCallbackFuture> Callbacks; }; -template <typename TFuture, typename TCallback> +template <bool RaiseIssues = true, typename TFuture, typename TCallback> std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> WrapFutureCallback(const TFuture& future, const TCallback& callback, const TString& message = "") { return std::make_pair(IGraphTransformer::TStatus::Async, future.Apply( @@ -338,7 +338,9 @@ WrapFutureCallback(const TFuture& future, const TCallback& callback, const TStri ? TStringBuilder() << "Execution of node: " << input->Content() : message); }); - res.ReportIssues(ctx.IssueManager); + + if constexpr (RaiseIssues) + res.ReportIssues(ctx.IssueManager); if (!res.Success()) { input->SetState(TExprNode::EState::Error); diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index bc505f924ad..6ee4e054cbe 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -806,7 +806,7 @@ private: FlushStatisticsToState(); - return WrapFutureCallback(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + return WrapFutureCallback<false>(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback"; auto duration = TInstant::Now() - startTime; @@ -839,7 +839,8 @@ private: state->Metrics->IncCounter("dq", "ForceFallback"); } return FallbackCallback(state, input, output, ctx); - } + } else + res.ReportIssues(ctx.IssueManager); output = input; input->SetState(TExprNode::EState::ExecutionComplete); @@ -1161,7 +1162,7 @@ private: int level = 0; // TODO: remove copy-paste - return WrapFutureCallback(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { auto duration = TInstant::Now() - startTime; if (state->Metrics) { state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds()); @@ -1182,9 +1183,11 @@ private: state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1)); // never fallback will be captured in yql_facade auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)}; + issues.AddIssues(res.Issues()); ctx.AssociativeIssues.emplace(input.Get(), std::move(issues)); return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error); - } + } else + res.ReportIssues(ctx.IssueManager); output = input; input->SetState(TExprNode::EState::ExecutionComplete); |