aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Kovalev <maxkovalev@ydb.tech>2024-12-12 13:49:02 +0300
committerGitHub <noreply@github.com>2024-12-12 10:49:02 +0000
commitcfe8084bcf9bcd6328e997dedfd21c70ec379fa3 (patch)
treee64a2d53847ebac36dc74656f2a7ab406533bf74
parent9af6b6f4c2433ba3ed20cb18a21e998c633009ee (diff)
downloadydb-cfe8084bcf9bcd6328e997dedfd21c70ec379fa3.tar.gz
YQL: Skip Hybrid at Dq timeout (#12487)
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp5
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto1
-rw-r--r--ydb/library/yql/providers/dq/api/protos/service.proto1
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp17
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp1
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.h1
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp1
7 files changed, 25 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index 42ac4412f19..bca17e24fcf 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -93,7 +93,7 @@ private:
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
Issues.AddIssues({issue});
*ExecutionTimeoutCounter += 1;
- Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED);
+ Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, true);
})
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
})
@@ -279,7 +279,7 @@ private:
Send(ev->Sender, response.Release());
}
- void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode)
+ void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool timeout = false)
{
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " with status=" << static_cast<int>(statusCode) << " issues=" << Issues.ToString();
if (Finished) {
@@ -292,6 +292,7 @@ private:
}
IssuesToMessage(Issues, result.MutableIssues());
result.SetStatusCode(statusCode);
+ result.SetTimeout(timeout);
Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result)));
Finished = true;
}
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index 41eaeedf109..b488e07f65c 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -184,6 +184,7 @@ message TQueryResponse {
uint64 RowsCount = 8;
NYql.NDqProto.StatusIds.StatusCode StatusCode = 9;
repeated NDqProto.TData Sample = 10;
+ bool Timeout = 11;
}
message TDqFailure {
diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto
index 1ed427ddb11..2ae1c3786d5 100644
--- a/ydb/library/yql/providers/dq/api/protos/service.proto
+++ b/ydb/library/yql/providers/dq/api/protos/service.proto
@@ -98,6 +98,7 @@ message ExecuteGraphResponse {
Ydb.Operations.Operation Operation = 1;
repeated ResponseMetric Metric = 2;
bool Truncated = 3;
+ bool Timeout = 4;
}
message SvnRevisionRequest {
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index f1daa3b7604..1f2f224fbee 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -1062,6 +1062,9 @@ private:
state->Statistics[state->MetricId++] = res.Statistics;
if (res.Fallback) {
+ if (res.Timeout) {
+ NotifyDqTimeout(state);
+ }
if (state->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || state->TypeCtx->ForceDq) {
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
issues.AddIssues(res.Issues());
@@ -1496,6 +1499,9 @@ private:
state->Metrics->IncCounter("dq", "Fallback");
}
state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
+ if (res.Timeout) {
+ NotifyDqTimeout(state);
+ }
// never fallback will be captured in yql_facade
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
issues.AddIssues(res.Issues());
@@ -1998,6 +2004,9 @@ private:
state->Metrics->IncCounter("dq", "Fallback");
}
state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
+ if (res.Timeout) {
+ NotifyDqTimeout(state);
+ }
}
CompleteNode(execState, node, [resIssues = res.Issues(), fallback = res.Fallback](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus {
@@ -2086,6 +2095,14 @@ private:
return status;
}
+ static void NotifyDqTimeout(const TDqStatePtr& state) {
+ auto integrations = GetUniqueIntegrations(*state->TypeCtx);
+ std::for_each(integrations.cbegin(), integrations.cend(), std::bind(&IDqIntegration::NotifyDqTimeout, std::placeholders::_1));
+ if (state->Metrics) {
+ state->Metrics->IncCounter("dq", "Timeout");
+ }
+ }
+
private:
TDqStatePtr State;
ISkiffConverterPtr SkiffConverter;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
index 4d6fc4804a8..b8710aac78e 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
@@ -176,6 +176,7 @@ public:
bool error = false;
bool fallback = false;
+ result.Timeout = resp.GetTimeout();
if (status.Ok()) {
YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::Ok";
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
index cab6c3fdc6d..5b7c27cfe1f 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
@@ -78,6 +78,7 @@ public:
bool Retriable = false;
bool Truncated = false;
ui64 RowsCount = 0;
+ bool Timeout = false;
TOperationStatistics Statistics;
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index 14c9a29e3c2..3d0da9ce955 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -178,6 +178,7 @@ namespace NYql::NDqs {
operation.Mutableresult()->PackFrom(queryResult);
*operation.Mutableissues() = result.GetIssues();
ResponseBuffer.SetTruncated(result.GetTruncated());
+ ResponseBuffer.SetTimeout(result.GetTimeout());
Reply(Ydb::StatusIds::SUCCESS, statusCode > 1 || result.GetIssues().size() > 0);
}