aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-07-22 15:09:16 +0300
committerbbiff <bbiff@yandex-team.com>2022-07-22 15:09:16 +0300
commit52117eee30cec2122c9f5073c1299c6ea363ae6f (patch)
tree12f99d4a28e1253e076b9e5d37f7340f5474025c
parenta11acf31af106017f9c4cead4e944993140b1e5f (diff)
downloadydb-52117eee30cec2122c9f5073c1299c6ea363ae6f.tar.gz
Add Transient issues handling
-rw-r--r--CMakeLists.darwin.txt1
-rw-r--r--CMakeLists.linux.txt1
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp33
-rw-r--r--ydb/library/yql/dq/common/dq_common.h1
-rw-r--r--ydb/library/yql/providers/dq/actors/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp8
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h7
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp7
-rw-r--r--ydb/library/yql/providers/dq/actors/grouped_issues.cpp72
-rw-r--r--ydb/library/yql/providers/dq/actors/grouped_issues.h40
-rw-r--r--ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp90
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp23
-rw-r--r--ydb/library/yql/providers/dq/actors/ut/CMakeLists.darwin.txt47
-rw-r--r--ydb/library/yql/providers/dq/actors/ut/CMakeLists.linux.txt51
-rw-r--r--ydb/library/yql/providers/dq/actors/ut/CMakeLists.txt13
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto4
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp1
18 files changed, 400 insertions, 8 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 4dc26d70da..98455e3778 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -1362,6 +1362,7 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl)
add_subdirectory(ydb/library/yql/providers/common/codec/ut)
add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock)
add_subdirectory(ydb/library/yql/providers/common/structured_token/ut)
+add_subdirectory(ydb/library/yql/providers/dq/actors/ut)
add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy)
add_subdirectory(ydb/library/yql/providers/s3/path_generator/ut)
add_subdirectory(ydb/library/yql/providers/s3/range_helpers/ut)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index bf49da37f2..8fd26941e7 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -1383,6 +1383,7 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl)
add_subdirectory(ydb/library/yql/providers/common/codec/ut)
add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock)
add_subdirectory(ydb/library/yql/providers/common/structured_token/ut)
+add_subdirectory(ydb/library/yql/providers/dq/actors/ut)
add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy)
add_subdirectory(ydb/library/yql/providers/s3/path_generator/ut)
add_subdirectory(ydb/library/yql/providers/s3/range_helpers/ut)
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index c2aca7904e..f76aff64ed 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -336,6 +336,7 @@ private:
hFunc(TEvents::TEvForwardPingResponse, Handle);
hFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone, Handle);
hFunc(TEvents::TEvRaiseTransientIssues, Handle);
+ hFunc(TEvDqStats, Handle);
)
STRICT_STFUNC(FinishStateFunc,
@@ -351,6 +352,7 @@ private:
IgnoreFunc(TEvents::TEvQueryActionResult);
IgnoreFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone);
IgnoreFunc(TEvents::TEvRaiseTransientIssues);
+ IgnoreFunc(TEvDqStats);
)
void KillExecuter() {
@@ -714,6 +716,12 @@ private:
Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, RaiseTransientIssuesCookie);
}
+ void Handle(TEvDqStats::TPtr& ev) {
+ Fq::Private::PingTaskRequest request;
+ *request.mutable_transient_issues() = ev->Get()->Record.issues();
+ Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0);
+ }
+
i32 UpdateResultIndices() {
i32 count = 0;
for (const auto& graphParams : DqGraphParams) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 9666cc9b20..685108ff31 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -107,6 +107,35 @@ private:
WaitingForStateResponse.push_back({ev->Sender, ev->Cookie});
}
+ void FillIssues(NYql::TIssues& issues) {
+ auto applyToAllIssuesHolders = [this](auto& function){
+ function(SourcesMap);
+ function(InputTransformsMap);
+ function(SinksMap);
+ function(OutputTransformsMap);
+ };
+
+ uint64_t expectingSize = 0;
+ auto addSize = [&expectingSize](auto& issuesHolder) {
+ for (auto& [inputIndex, source]: issuesHolder) {
+ expectingSize += source.IssuesBuffer.GetAllAddedIssuesCount();
+ }
+ };
+
+ auto exhaustIssues = [&issues](auto& issuesHolder){
+ for (auto& [inputIndex, source]: issuesHolder) {
+ auto sourceIssues = source.IssuesBuffer.Dump();
+ for (auto& issueInfo: sourceIssues) {
+ issues.AddIssues(issueInfo.Issues);
+ }
+ }
+ };
+
+ applyToAllIssuesHolders(addSize);
+ issues.Reserve(expectingSize);
+ applyToAllIssuesHolders(exhaustIssues);
+ }
+
void OnStatisticsResponse(NTaskRunnerActor::TEvStatistics::TPtr& ev) {
SentStatsRequest = false;
if (ev->Get()->Stats) {
@@ -117,6 +146,10 @@ private:
record.SetState(NDqProto::COMPUTE_STATE_EXECUTING);
record.SetStatusCode(NYql::NDqProto::StatusIds::SUCCESS);
record.SetTaskId(Task.GetId());
+ NYql::TIssues issues;
+ FillIssues(issues);
+
+ IssuesToMessage(issues, record.MutableIssues());
FillStats(record.MutableStats(), /* last */ false);
for (const auto& [actorId, cookie] : WaitingForStateResponse) {
auto state = MakeHolder<TEvDqCompute::TEvState>();
diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h
index c60a5b1f4e..d2e1395871 100644
--- a/ydb/library/yql/dq/common/dq_common.h
+++ b/ydb/library/yql/dq/common/dq_common.h
@@ -58,6 +58,7 @@ struct TBaseDqExecuterEvents {
ES_GRAPH,
ES_GRAPH_FINISHED,
ES_GRAPH_EXECUTION_EVENT,
+ ES_STATS,
};
};
diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.txt
index a607fb65d8..82a98109d3 100644
--- a/ydb/library/yql/providers/dq/actors/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/actors/CMakeLists.txt
@@ -58,4 +58,5 @@ target_sources(providers-dq-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/result_receiver.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/proto_builder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/grouped_issues.cpp
)
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index 4582deaee8..99b8813e61 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -24,12 +24,16 @@ namespace NYql::NDqs {
/*
TIssue(error).SetCode(
- needFallback ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR),
+ needFallback ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR),
*/
- TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString&)
+ TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString&)
: TEvDqFailure(statusCode) {
}
+ TEvDqStats::TEvDqStats(const TIssues& issues) {
+ IssuesToMessage(issues, Record.mutable_issues());
+ }
+
TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) {
Record = std::move(queryResult);
}
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index 220eb453ec..f226953447 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -27,6 +27,11 @@ namespace NYql::NDqs {
TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error);
};
+ struct TEvDqStats : NActors::TEventPB<TEvDqStats, NDqProto::TDqStats, TDqExecuterEvents::ES_STATS> {
+ TEvDqStats() = default;
+ TEvDqStats(const TIssues& issues);
+ };
+
struct TEvQueryResponse
: NActors::TEventPB<TEvQueryResponse, NDqProto::TQueryResponse, TDqExecuterEvents::ES_RESULT_SET> {
TEvQueryResponse() = default;
@@ -115,7 +120,7 @@ namespace NYql::NDqs {
DEFINE_SIMPLE_LOCAL_EVENT(TEvMessageProcessed, "");
explicit TEvMessageProcessed(const TString& messageId);
-
+
const TString MessageId;
};
}
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index 9caf03e4ff..0b6fdfd3a2 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -76,6 +76,7 @@ private:
HFunc(TEvAllocateWorkersResponse, OnAllocateWorkersResponse);
cFunc(TEvents::TEvPoison::EventType, PassAway);
hFunc(NActors::TEvents::TEvPoisonTaken, Handle);
+ hFunc(TEvDqStats, OnTransientIssues);
HFunc(TEvDqFailure, OnFailure);
HFunc(TEvGraphFinished, OnGraphFinished);
HFunc(TEvQueryResponse, OnQueryResponse);
@@ -296,6 +297,12 @@ private:
PassAway();
}
+ void OnTransientIssues(TEvDqStats::TPtr& ev) {
+ YQL_LOG_CTX_ROOT_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
+ Send(PrinterId, ev->Release().Release());
+ }
+
void Handle(NActors::TEvents::TEvPoisonTaken::TPtr&) {
// ignore ack from checkpoint coordinator now
}
diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp
new file mode 100644
index 0000000000..2d955fadea
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp
@@ -0,0 +1,72 @@
+#include "grouped_issues.h"
+
+
+NYql::TIssues NYql::NDq::GroupedIssues::ToIssues() {
+ NYql::TIssues issues;
+ TVector<std::pair<NYql::TIssue, IssueGroupInfo>> issueVector;
+
+ for (auto& p: Issues) {
+ issueVector.push_back(p);
+ }
+
+ std::sort(issueVector.begin(), issueVector.end(), [](const auto& a, const auto& b) -> bool {
+ return a.second.LastEncountered > b.second.LastEncountered;
+ });
+
+ for (auto& [issue, meta]: issueVector) {
+ auto modified_issue_message = issue.Message + " " + meta.InfoString();
+ auto modified_issue = NYql::TIssue(issue.Position, issue.EndPosition, modified_issue_message);
+ issues.AddIssue(modified_issue);
+ }
+ return issues;
+}
+
+TString NYql::NDq::GroupedIssues::IssueGroupInfo::InfoString() {
+ TString message = TStringBuilder() <<
+ "(appeared " << EncountersNumber <<
+ (EncountersNumber == 1?" time at ":" times, last at ") <<
+ LastEncountered.ToString() << ")";
+
+ return message;
+}
+
+bool NYql::NDq::GroupedIssues::Empty() {
+ return Issues.empty();
+}
+
+void NYql::NDq::GroupedIssues::AddIssue(const NYql::TIssue& issue) {
+ auto& inserted = Issues[issue];
+ ++inserted.EncountersNumber;
+ inserted.LastEncountered = TimeProvider->Now();
+ RemoveOldIssues();
+}
+
+void NYql::NDq::GroupedIssues::RemoveOldIssues() {
+ NYql::TIssue oldest;
+ IssueGroupInfo oldestInfo;
+ TVector<TIssue> toRemove;
+ for (auto& [issue, meta]: Issues) {
+ if (meta.LastEncountered < oldestInfo.LastEncountered ||
+ oldestInfo.LastEncountered == TInstant::Zero()) {
+ oldest = issue;
+ oldestInfo = meta;
+ }
+ if (meta.LastEncountered + IssueExpiration <= TimeProvider->Now()) {
+ toRemove.push_back(issue);
+ }
+ }
+ if (Issues.size() > MaxIssues) {
+ toRemove.push_back(oldest);
+ }
+ for (auto& issue: toRemove) {
+ if (Issues.contains(issue)) { // oldest could be added twice
+ Issues.erase(issue);
+ }
+ }
+}
+
+void NYql::NDq::GroupedIssues::AddIssues(const NYql::TIssues& issues) {
+ for (auto& issue: issues) {
+ AddIssue(issue);
+ }
+}
diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues.h b/ydb/library/yql/providers/dq/actors/grouped_issues.h
new file mode 100644
index 0000000000..0b447f5853
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/grouped_issues.h
@@ -0,0 +1,40 @@
+#include <ydb/library/yql/public/issue/yql_issue.h>
+
+#include <library/cpp/time_provider/time_provider.h>
+
+#include <util/datetime/base.h>
+#include <util/string/builder.h>
+
+#include <algorithm>
+
+
+
+namespace NYql::NDq {
+struct GroupedIssues {
+ NYql::TIssues ToIssues();
+
+ explicit GroupedIssues(TIntrusivePtr<ITimeProvider> timeProvider): TimeProvider(timeProvider) {
+ }
+
+ struct IssueGroupInfo {
+ TString InfoString();
+
+ TInstant LastEncountered = TInstant::Zero();
+ uint64_t EncountersNumber;
+ };
+
+ bool Empty();
+
+ void AddIssue(const NYql::TIssue& issue);
+
+ void AddIssues(const NYql::TIssues& issues);
+
+ void RemoveOldIssues();
+
+ THashMap<NYql::TIssue, IssueGroupInfo> Issues;
+ TIntrusivePtr<ITimeProvider> TimeProvider;
+ TDuration IssueExpiration = TDuration::Hours(1);
+ ui64 MaxIssues = 20;
+
+};
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp
new file mode 100644
index 0000000000..c82121520d
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp
@@ -0,0 +1,90 @@
+#include <ydb/core/yq/libs/ydb/ydb.h>
+
+#include <ydb/library/security/ydb_credentials_provider_factory.h>
+#include <ydb/library/yql/providers/dq/actors/grouped_issues.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/time_provider/time_provider.h>
+
+#include <util/system/env.h>
+
+namespace NYq {
+
+using namespace NYql;
+
+struct AgileTimeProvider: ITimeProvider {
+ AgileTimeProvider(ui64 secs): Value(TInstant::Seconds(secs)) {
+ }
+
+ void IncreaseTime(ui64 secs_to_add) {
+ Value = TInstant::Seconds(Value.Seconds() + secs_to_add);
+ }
+
+ TInstant Now() {
+ return Value;
+ }
+
+ TInstant Value = Now();
+};
+
+TIntrusivePtr<AgileTimeProvider> CreateAgileTimeProvider(ui64 initial) {
+ return TIntrusivePtr<AgileTimeProvider>(new AgileTimeProvider(initial));
+}
+
+Y_UNIT_TEST_SUITE(TestIssuesGrouping) {
+ Y_UNIT_TEST(ShouldCountEveryIssue) {
+ int iterations = 4;
+ int issueTypes = 4;
+ int expectedNumberOfIssues[issueTypes];
+
+ for (int i = 0; i < iterations; ++i) {
+ NDq::GroupedIssues holder(CreateDefaultTimeProvider());
+ for (int j = 0; j < issueTypes; ++j) {
+ expectedNumberOfIssues[j] = rand() % 100;
+ for (int k = 0; k < expectedNumberOfIssues[j]; ++k) {
+ holder.AddIssue(TIssue(ToString(j)));
+ }
+ }
+ UNIT_ASSERT(holder.Issues.size() == 4);
+ for (int j = 0; j < issueTypes; ++j) {
+ int encounters = holder.Issues[TIssue(ToString(j))].EncountersNumber;
+ UNIT_ASSERT_C(encounters == expectedNumberOfIssues[j],
+ "expected " << expectedNumberOfIssues[j] << " got " << encounters << " at index " << j << " at iteration " << i);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(ShouldRemoveOldIssues) {
+ TIntrusivePtr<AgileTimeProvider> timeProvider = CreateAgileTimeProvider(1);
+ NDq::GroupedIssues holder(timeProvider);
+ holder.IssueExpiration = TDuration::Seconds(5);
+ holder.AddIssue(TIssue("a"));
+ timeProvider->IncreaseTime(10);
+ holder.AddIssue(TIssue("b"));
+ UNIT_ASSERT_C(holder.Issues.size() < 2, "old issue is not removed");
+ }
+
+ Y_UNIT_TEST(ShouldRemoveIfMoreThanMaxIssues) {
+ NDq::GroupedIssues holder(CreateDefaultTimeProvider());
+ for (int i = 0; i < 30; ++i) {
+ holder.AddIssue(TIssue(ToString(i)));
+ }
+ UNIT_ASSERT_C(holder.Issues.size() <= holder.MaxIssues, "overflow issues are not removed");
+ }
+
+ Y_UNIT_TEST(ShouldRemoveTheOldestIfMoreThanMaxIssues) {
+ TIntrusivePtr<AgileTimeProvider> timeProvider = CreateAgileTimeProvider(1);
+ NDq::GroupedIssues holder(timeProvider);
+ auto eldery = TIssue("there is a simple honor in poverty");
+ holder.AddIssue(eldery);
+ timeProvider->IncreaseTime(1);
+ for (int i = 0; i < 20; ++i) {
+ holder.AddIssue(TIssue(ToString(i)));
+ }
+ UNIT_ASSERT_C(!holder.Issues.contains(eldery), "the oldest issue is not removed");
+ }
+}
+} // namespace NYq
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index d173cb6d74..d5fbfe8cb0 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -4,6 +4,7 @@
#include "proto_builder.h"
#include "actor_helpers.h"
#include "executer_actor.h"
+#include "grouped_issues.h"
#include <ydb/library/yql/providers/dq/counters/counters.h>
@@ -62,6 +63,7 @@ public:
, ServiceCounters(serviceCounters, "task_controller")
, PingPeriod(pingPeriod)
, AggrPeriod(aggrPeriod)
+ , Issues(CreateDefaultTimeProvider())
{
if (Settings) {
if (Settings->_AllResultsBytesLimit.Get()) {
@@ -110,6 +112,11 @@ private:
OnError(statusCode, issues);
}
+ void SendNonFatalIssues() {
+ auto req = MakeHolder<TEvDqStats>(Issues.ToIssues());
+ Send(ExecuterId, req.Release());
+ }
+
void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
@@ -130,6 +137,10 @@ private:
}
}
+ TIssues localIssues;
+ // TODO: don't convert issues to string
+ NYql::IssuesFromMessage(state.GetIssues(), localIssues);
+
switch (state.GetState()) {
case NDqProto::COMPUTE_STATE_UNKNOWN: {
// TODO: use issues
@@ -138,17 +149,18 @@ private:
break;
}
case NDqProto::COMPUTE_STATE_FAILURE: {
- // TODO: don't convert issues to string
- NYql::IssuesFromMessage(state.GetIssues(), Issues);
- OnError(state.GetStatusCode(), Issues);
+ Issues.AddIssues(localIssues);
+ OnError(state.GetStatusCode(), Issues.ToIssues());
break;
}
case NDqProto::COMPUTE_STATE_EXECUTING: {
+ Issues.AddIssues(localIssues);
YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId;
if (!FinishedTasks.contains(taskId)) {
// may get late/reordered? message
Executing[taskId] = Now();
}
+ SendNonFatalIssues();
break;
}
case NDqProto::COMPUTE_STATE_FINISHED: {
@@ -522,7 +534,7 @@ private:
YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
FinalStat().FlushCounters(ev->Get()->Record);
if (!Issues.Empty()) {
- IssuesToMessage(Issues, ev->Get()->Record.MutableIssues());
+ IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues());
}
Send(ResultId, ev->Release().Release());
}
@@ -531,6 +543,7 @@ private:
return AggrPeriod ? AggregateQueryStatsByStage(TaskStat, Stages) : TaskStat;
}
+
bool ChannelsUpdated = false;
TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks;
THashSet<ui64> FinishedTasks;
@@ -547,7 +560,7 @@ private:
NYql::NCommon::TServiceCounters ServiceCounters;
TDuration PingPeriod = TDuration::Zero();
TDuration AggrPeriod = TDuration::Zero();
- TIssues Issues;
+ NYql::NDq::GroupedIssues Issues;
ui64 PingCookie = 0;
};
diff --git a/ydb/library/yql/providers/dq/actors/ut/CMakeLists.darwin.txt b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..92b8368d68
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.darwin.txt
@@ -0,0 +1,47 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-library-yql-providers-dq-actors-ut)
+target_compile_options(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors
+)
+target_link_libraries(ydb-library-yql-providers-dq-actors-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ providers-dq-actors
+ cpp-testing-unittest
+)
+target_link_options(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp
+)
+add_test(
+ NAME
+ ydb-library-yql-providers-dq-actors-ut
+ COMMAND
+ ydb-library-yql-providers-dq-actors-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-library-yql-providers-dq-actors-ut)
diff --git a/ydb/library/yql/providers/dq/actors/ut/CMakeLists.linux.txt b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.linux.txt
new file mode 100644
index 0000000000..5a9b83e399
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.linux.txt
@@ -0,0 +1,51 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-library-yql-providers-dq-actors-ut)
+target_compile_options(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors
+)
+target_link_libraries(ydb-library-yql-providers-dq-actors-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ providers-dq-actors
+ cpp-testing-unittest
+)
+target_link_options(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-library-yql-providers-dq-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp
+)
+add_test(
+ NAME
+ ydb-library-yql-providers-dq-actors-ut
+ COMMAND
+ ydb-library-yql-providers-dq-actors-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-library-yql-providers-dq-actors-ut)
diff --git a/ydb/library/yql/providers/dq/actors/ut/CMakeLists.txt b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.txt
new file mode 100644
index 0000000000..fc7b1ee73c
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX AND NOT APPLE)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index 8145334021..874b528c51 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -189,6 +189,10 @@ message TDqFailure {
NYql.NDqProto.StatusIds.StatusCode StatusCode = 8;
};
+message TDqStats {
+ repeated Ydb.Issue.IssueMessage Issues = 1;
+};
+
message TGraphRequest {
Yql.DqsProto.ExecuteGraphRequest Request = 1;
NActorsProto.TActorId ControlId = 2;
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index dc0825146c..0ab2a01e18 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -272,6 +272,7 @@ private:
TIssues issues { TIssue(errorBuilder) };
SINK_LOG_W("Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString());
+
Callbacks->OnAsyncOutputError(OutputIndex, issues, res->IsTerminal);
return;
}