diff options
author | Maxim Kovalev <maxkovalev@ydb.tech> | 2024-12-12 13:49:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-12 10:49:02 +0000 |
commit | cfe8084bcf9bcd6328e997dedfd21c70ec379fa3 (patch) | |
tree | e64a2d53847ebac36dc74656f2a7ab406533bf74 | |
parent | 9af6b6f4c2433ba3ed20cb18a21e998c633009ee (diff) | |
download | ydb-cfe8084bcf9bcd6328e997dedfd21c70ec379fa3.tar.gz |
YQL: Skip Hybrid at Dq timeout (#12487)
7 files changed, 25 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 42ac4412f19..bca17e24fcf 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -93,7 +93,7 @@ private: issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); *ExecutionTimeoutCounter += 1; - Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED); + Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, true); }) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) }) @@ -279,7 +279,7 @@ private: Send(ev->Sender, response.Release()); } - void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode) + void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool timeout = false) { YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " with status=" << static_cast<int>(statusCode) << " issues=" << Issues.ToString(); if (Finished) { @@ -292,6 +292,7 @@ private: } IssuesToMessage(Issues, result.MutableIssues()); result.SetStatusCode(statusCode); + result.SetTimeout(timeout); Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result))); Finished = true; } diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 41eaeedf109..b488e07f65c 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -184,6 +184,7 @@ message TQueryResponse { uint64 RowsCount = 8; NYql.NDqProto.StatusIds.StatusCode StatusCode = 9; repeated NDqProto.TData Sample = 10; + bool Timeout = 11; } message TDqFailure { diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index 1ed427ddb11..2ae1c3786d5 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -98,6 +98,7 @@ message ExecuteGraphResponse { Ydb.Operations.Operation Operation = 1; repeated ResponseMetric Metric = 2; bool Truncated = 3; + bool Timeout = 4; } message SvnRevisionRequest { diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index f1daa3b7604..1f2f224fbee 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1062,6 +1062,9 @@ private: state->Statistics[state->MetricId++] = res.Statistics; if (res.Fallback) { + if (res.Timeout) { + NotifyDqTimeout(state); + } if (state->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || state->TypeCtx->ForceDq) { auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)}; issues.AddIssues(res.Issues()); @@ -1496,6 +1499,9 @@ private: state->Metrics->IncCounter("dq", "Fallback"); } state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1)); + if (res.Timeout) { + NotifyDqTimeout(state); + } // never fallback will be captured in yql_facade auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)}; issues.AddIssues(res.Issues()); @@ -1998,6 +2004,9 @@ private: state->Metrics->IncCounter("dq", "Fallback"); } state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1)); + if (res.Timeout) { + NotifyDqTimeout(state); + } } CompleteNode(execState, node, [resIssues = res.Issues(), fallback = res.Fallback](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus { @@ -2086,6 +2095,14 @@ private: return status; } + static void NotifyDqTimeout(const TDqStatePtr& state) { + auto integrations = GetUniqueIntegrations(*state->TypeCtx); + std::for_each(integrations.cbegin(), integrations.cend(), std::bind(&IDqIntegration::NotifyDqTimeout, std::placeholders::_1)); + if (state->Metrics) { + state->Metrics->IncCounter("dq", "Timeout"); + } + } + private: TDqStatePtr State; ISkiffConverterPtr SkiffConverter; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 4d6fc4804a8..b8710aac78e 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -176,6 +176,7 @@ public: bool error = false; bool fallback = false; + result.Timeout = resp.GetTimeout(); if (status.Ok()) { YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::Ok"; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index cab6c3fdc6d..5b7c27cfe1f 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -78,6 +78,7 @@ public: bool Retriable = false; bool Truncated = false; ui64 RowsCount = 0; + bool Timeout = false; TOperationStatistics Statistics; diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 14c9a29e3c2..3d0da9ce955 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -178,6 +178,7 @@ namespace NYql::NDqs { operation.Mutableresult()->PackFrom(queryResult); *operation.Mutableissues() = result.GetIssues(); ResponseBuffer.SetTruncated(result.GetTruncated()); + ResponseBuffer.SetTimeout(result.GetTimeout()); Reply(Ydb::StatusIds::SUCCESS, statusCode > 1 || result.GetIssues().size() > 0); } |