diff options
author | bbiff <bbiff@yandex-team.com> | 2022-07-22 15:09:16 +0300 |
---|---|---|
committer | bbiff <bbiff@yandex-team.com> | 2022-07-22 15:09:16 +0300 |
commit | 52117eee30cec2122c9f5073c1299c6ea363ae6f (patch) | |
tree | 12f99d4a28e1253e076b9e5d37f7340f5474025c | |
parent | a11acf31af106017f9c4cead4e944993140b1e5f (diff) | |
download | ydb-52117eee30cec2122c9f5073c1299c6ea363ae6f.tar.gz |
Add Transient issues handling
18 files changed, 400 insertions, 8 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 4dc26d70da..98455e3778 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1362,6 +1362,7 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl) add_subdirectory(ydb/library/yql/providers/common/codec/ut) add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock) add_subdirectory(ydb/library/yql/providers/common/structured_token/ut) +add_subdirectory(ydb/library/yql/providers/dq/actors/ut) add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy) add_subdirectory(ydb/library/yql/providers/s3/path_generator/ut) add_subdirectory(ydb/library/yql/providers/s3/range_helpers/ut) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index bf49da37f2..8fd26941e7 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -1383,6 +1383,7 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl) add_subdirectory(ydb/library/yql/providers/common/codec/ut) add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock) add_subdirectory(ydb/library/yql/providers/common/structured_token/ut) +add_subdirectory(ydb/library/yql/providers/dq/actors/ut) add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy) add_subdirectory(ydb/library/yql/providers/s3/path_generator/ut) add_subdirectory(ydb/library/yql/providers/s3/range_helpers/ut) diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index c2aca7904e..f76aff64ed 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -336,6 +336,7 @@ private: hFunc(TEvents::TEvForwardPingResponse, Handle); hFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone, Handle); hFunc(TEvents::TEvRaiseTransientIssues, Handle); + hFunc(TEvDqStats, Handle); ) STRICT_STFUNC(FinishStateFunc, @@ -351,6 +352,7 @@ private: IgnoreFunc(TEvents::TEvQueryActionResult); IgnoreFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone); IgnoreFunc(TEvents::TEvRaiseTransientIssues); + IgnoreFunc(TEvDqStats); ) void KillExecuter() { @@ -714,6 +716,12 @@ private: Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, RaiseTransientIssuesCookie); } + void Handle(TEvDqStats::TPtr& ev) { + Fq::Private::PingTaskRequest request; + *request.mutable_transient_issues() = ev->Get()->Record.issues(); + Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0); + } + i32 UpdateResultIndices() { i32 count = 0; for (const auto& graphParams : DqGraphParams) { diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 9666cc9b20..685108ff31 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -107,6 +107,35 @@ private: WaitingForStateResponse.push_back({ev->Sender, ev->Cookie}); } + void FillIssues(NYql::TIssues& issues) { + auto applyToAllIssuesHolders = [this](auto& function){ + function(SourcesMap); + function(InputTransformsMap); + function(SinksMap); + function(OutputTransformsMap); + }; + + uint64_t expectingSize = 0; + auto addSize = [&expectingSize](auto& issuesHolder) { + for (auto& [inputIndex, source]: issuesHolder) { + expectingSize += source.IssuesBuffer.GetAllAddedIssuesCount(); + } + }; + + auto exhaustIssues = [&issues](auto& issuesHolder){ + for (auto& [inputIndex, source]: issuesHolder) { + auto sourceIssues = source.IssuesBuffer.Dump(); + for (auto& issueInfo: sourceIssues) { + issues.AddIssues(issueInfo.Issues); + } + } + }; + + applyToAllIssuesHolders(addSize); + issues.Reserve(expectingSize); + applyToAllIssuesHolders(exhaustIssues); + } + void OnStatisticsResponse(NTaskRunnerActor::TEvStatistics::TPtr& ev) { SentStatsRequest = false; if (ev->Get()->Stats) { @@ -117,6 +146,10 @@ private: record.SetState(NDqProto::COMPUTE_STATE_EXECUTING); record.SetStatusCode(NYql::NDqProto::StatusIds::SUCCESS); record.SetTaskId(Task.GetId()); + NYql::TIssues issues; + FillIssues(issues); + + IssuesToMessage(issues, record.MutableIssues()); FillStats(record.MutableStats(), /* last */ false); for (const auto& [actorId, cookie] : WaitingForStateResponse) { auto state = MakeHolder<TEvDqCompute::TEvState>(); diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index c60a5b1f4e..d2e1395871 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -58,6 +58,7 @@ struct TBaseDqExecuterEvents { ES_GRAPH, ES_GRAPH_FINISHED, ES_GRAPH_EXECUTION_EVENT, + ES_STATS, }; }; diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.txt index a607fb65d8..82a98109d3 100644 --- a/ydb/library/yql/providers/dq/actors/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/actors/CMakeLists.txt @@ -58,4 +58,5 @@ target_sources(providers-dq-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/result_receiver.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/full_result_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/proto_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/grouped_issues.cpp ) diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 4582deaee8..99b8813e61 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -24,12 +24,16 @@ namespace NYql::NDqs { /* TIssue(error).SetCode( - needFallback ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR), + needFallback ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR), */ - TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString&) + TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString&) : TEvDqFailure(statusCode) { } + TEvDqStats::TEvDqStats(const TIssues& issues) { + IssuesToMessage(issues, Record.mutable_issues()); + } + TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) { Record = std::move(queryResult); } diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index 220eb453ec..f226953447 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -27,6 +27,11 @@ namespace NYql::NDqs { TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error); }; + struct TEvDqStats : NActors::TEventPB<TEvDqStats, NDqProto::TDqStats, TDqExecuterEvents::ES_STATS> { + TEvDqStats() = default; + TEvDqStats(const TIssues& issues); + }; + struct TEvQueryResponse : NActors::TEventPB<TEvQueryResponse, NDqProto::TQueryResponse, TDqExecuterEvents::ES_RESULT_SET> { TEvQueryResponse() = default; @@ -115,7 +120,7 @@ namespace NYql::NDqs { DEFINE_SIMPLE_LOCAL_EVENT(TEvMessageProcessed, ""); explicit TEvMessageProcessed(const TString& messageId); - + const TString MessageId; }; } diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 9caf03e4ff..0b6fdfd3a2 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -76,6 +76,7 @@ private: HFunc(TEvAllocateWorkersResponse, OnAllocateWorkersResponse); cFunc(TEvents::TEvPoison::EventType, PassAway); hFunc(NActors::TEvents::TEvPoisonTaken, Handle); + hFunc(TEvDqStats, OnTransientIssues); HFunc(TEvDqFailure, OnFailure); HFunc(TEvGraphFinished, OnGraphFinished); HFunc(TEvQueryResponse, OnQueryResponse); @@ -296,6 +297,12 @@ private: PassAway(); } + void OnTransientIssues(TEvDqStats::TPtr& ev) { + YQL_LOG_CTX_ROOT_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; + Send(PrinterId, ev->Release().Release()); + } + void Handle(NActors::TEvents::TEvPoisonTaken::TPtr&) { // ignore ack from checkpoint coordinator now } diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp new file mode 100644 index 0000000000..2d955fadea --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp @@ -0,0 +1,72 @@ +#include "grouped_issues.h" + + +NYql::TIssues NYql::NDq::GroupedIssues::ToIssues() { + NYql::TIssues issues; + TVector<std::pair<NYql::TIssue, IssueGroupInfo>> issueVector; + + for (auto& p: Issues) { + issueVector.push_back(p); + } + + std::sort(issueVector.begin(), issueVector.end(), [](const auto& a, const auto& b) -> bool { + return a.second.LastEncountered > b.second.LastEncountered; + }); + + for (auto& [issue, meta]: issueVector) { + auto modified_issue_message = issue.Message + " " + meta.InfoString(); + auto modified_issue = NYql::TIssue(issue.Position, issue.EndPosition, modified_issue_message); + issues.AddIssue(modified_issue); + } + return issues; +} + +TString NYql::NDq::GroupedIssues::IssueGroupInfo::InfoString() { + TString message = TStringBuilder() << + "(appeared " << EncountersNumber << + (EncountersNumber == 1?" time at ":" times, last at ") << + LastEncountered.ToString() << ")"; + + return message; +} + +bool NYql::NDq::GroupedIssues::Empty() { + return Issues.empty(); +} + +void NYql::NDq::GroupedIssues::AddIssue(const NYql::TIssue& issue) { + auto& inserted = Issues[issue]; + ++inserted.EncountersNumber; + inserted.LastEncountered = TimeProvider->Now(); + RemoveOldIssues(); +} + +void NYql::NDq::GroupedIssues::RemoveOldIssues() { + NYql::TIssue oldest; + IssueGroupInfo oldestInfo; + TVector<TIssue> toRemove; + for (auto& [issue, meta]: Issues) { + if (meta.LastEncountered < oldestInfo.LastEncountered || + oldestInfo.LastEncountered == TInstant::Zero()) { + oldest = issue; + oldestInfo = meta; + } + if (meta.LastEncountered + IssueExpiration <= TimeProvider->Now()) { + toRemove.push_back(issue); + } + } + if (Issues.size() > MaxIssues) { + toRemove.push_back(oldest); + } + for (auto& issue: toRemove) { + if (Issues.contains(issue)) { // oldest could be added twice + Issues.erase(issue); + } + } +} + +void NYql::NDq::GroupedIssues::AddIssues(const NYql::TIssues& issues) { + for (auto& issue: issues) { + AddIssue(issue); + } +} diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues.h b/ydb/library/yql/providers/dq/actors/grouped_issues.h new file mode 100644 index 0000000000..0b447f5853 --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/grouped_issues.h @@ -0,0 +1,40 @@ +#include <ydb/library/yql/public/issue/yql_issue.h> + +#include <library/cpp/time_provider/time_provider.h> + +#include <util/datetime/base.h> +#include <util/string/builder.h> + +#include <algorithm> + + + +namespace NYql::NDq { +struct GroupedIssues { + NYql::TIssues ToIssues(); + + explicit GroupedIssues(TIntrusivePtr<ITimeProvider> timeProvider): TimeProvider(timeProvider) { + } + + struct IssueGroupInfo { + TString InfoString(); + + TInstant LastEncountered = TInstant::Zero(); + uint64_t EncountersNumber; + }; + + bool Empty(); + + void AddIssue(const NYql::TIssue& issue); + + void AddIssues(const NYql::TIssues& issues); + + void RemoveOldIssues(); + + THashMap<NYql::TIssue, IssueGroupInfo> Issues; + TIntrusivePtr<ITimeProvider> TimeProvider; + TDuration IssueExpiration = TDuration::Hours(1); + ui64 MaxIssues = 20; + +}; +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp new file mode 100644 index 0000000000..c82121520d --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp @@ -0,0 +1,90 @@ +#include <ydb/core/yq/libs/ydb/ydb.h> + +#include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <ydb/library/yql/providers/dq/actors/grouped_issues.h> + +#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/time_provider/time_provider.h> + +#include <util/system/env.h> + +namespace NYq { + +using namespace NYql; + +struct AgileTimeProvider: ITimeProvider { + AgileTimeProvider(ui64 secs): Value(TInstant::Seconds(secs)) { + } + + void IncreaseTime(ui64 secs_to_add) { + Value = TInstant::Seconds(Value.Seconds() + secs_to_add); + } + + TInstant Now() { + return Value; + } + + TInstant Value = Now(); +}; + +TIntrusivePtr<AgileTimeProvider> CreateAgileTimeProvider(ui64 initial) { + return TIntrusivePtr<AgileTimeProvider>(new AgileTimeProvider(initial)); +} + +Y_UNIT_TEST_SUITE(TestIssuesGrouping) { + Y_UNIT_TEST(ShouldCountEveryIssue) { + int iterations = 4; + int issueTypes = 4; + int expectedNumberOfIssues[issueTypes]; + + for (int i = 0; i < iterations; ++i) { + NDq::GroupedIssues holder(CreateDefaultTimeProvider()); + for (int j = 0; j < issueTypes; ++j) { + expectedNumberOfIssues[j] = rand() % 100; + for (int k = 0; k < expectedNumberOfIssues[j]; ++k) { + holder.AddIssue(TIssue(ToString(j))); + } + } + UNIT_ASSERT(holder.Issues.size() == 4); + for (int j = 0; j < issueTypes; ++j) { + int encounters = holder.Issues[TIssue(ToString(j))].EncountersNumber; + UNIT_ASSERT_C(encounters == expectedNumberOfIssues[j], + "expected " << expectedNumberOfIssues[j] << " got " << encounters << " at index " << j << " at iteration " << i); + } + } + } + + Y_UNIT_TEST(ShouldRemoveOldIssues) { + TIntrusivePtr<AgileTimeProvider> timeProvider = CreateAgileTimeProvider(1); + NDq::GroupedIssues holder(timeProvider); + holder.IssueExpiration = TDuration::Seconds(5); + holder.AddIssue(TIssue("a")); + timeProvider->IncreaseTime(10); + holder.AddIssue(TIssue("b")); + UNIT_ASSERT_C(holder.Issues.size() < 2, "old issue is not removed"); + } + + Y_UNIT_TEST(ShouldRemoveIfMoreThanMaxIssues) { + NDq::GroupedIssues holder(CreateDefaultTimeProvider()); + for (int i = 0; i < 30; ++i) { + holder.AddIssue(TIssue(ToString(i))); + } + UNIT_ASSERT_C(holder.Issues.size() <= holder.MaxIssues, "overflow issues are not removed"); + } + + Y_UNIT_TEST(ShouldRemoveTheOldestIfMoreThanMaxIssues) { + TIntrusivePtr<AgileTimeProvider> timeProvider = CreateAgileTimeProvider(1); + NDq::GroupedIssues holder(timeProvider); + auto eldery = TIssue("there is a simple honor in poverty"); + holder.AddIssue(eldery); + timeProvider->IncreaseTime(1); + for (int i = 0; i < 20; ++i) { + holder.AddIssue(TIssue(ToString(i))); + } + UNIT_ASSERT_C(!holder.Issues.contains(eldery), "the oldest issue is not removed"); + } +} +} // namespace NYq diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index d173cb6d74..d5fbfe8cb0 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -4,6 +4,7 @@ #include "proto_builder.h" #include "actor_helpers.h" #include "executer_actor.h" +#include "grouped_issues.h" #include <ydb/library/yql/providers/dq/counters/counters.h> @@ -62,6 +63,7 @@ public: , ServiceCounters(serviceCounters, "task_controller") , PingPeriod(pingPeriod) , AggrPeriod(aggrPeriod) + , Issues(CreateDefaultTimeProvider()) { if (Settings) { if (Settings->_AllResultsBytesLimit.Get()) { @@ -110,6 +112,11 @@ private: OnError(statusCode, issues); } + void SendNonFatalIssues() { + auto req = MakeHolder<TEvDqStats>(Issues.ToIssues()); + Send(ExecuterId, req.Release()); + } + void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; @@ -130,6 +137,10 @@ private: } } + TIssues localIssues; + // TODO: don't convert issues to string + NYql::IssuesFromMessage(state.GetIssues(), localIssues); + switch (state.GetState()) { case NDqProto::COMPUTE_STATE_UNKNOWN: { // TODO: use issues @@ -138,17 +149,18 @@ private: break; } case NDqProto::COMPUTE_STATE_FAILURE: { - // TODO: don't convert issues to string - NYql::IssuesFromMessage(state.GetIssues(), Issues); - OnError(state.GetStatusCode(), Issues); + Issues.AddIssues(localIssues); + OnError(state.GetStatusCode(), Issues.ToIssues()); break; } case NDqProto::COMPUTE_STATE_EXECUTING: { + Issues.AddIssues(localIssues); YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId; if (!FinishedTasks.contains(taskId)) { // may get late/reordered? message Executing[taskId] = Now(); } + SendNonFatalIssues(); break; } case NDqProto::COMPUTE_STATE_FINISHED: { @@ -522,7 +534,7 @@ private: YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); FinalStat().FlushCounters(ev->Get()->Record); if (!Issues.Empty()) { - IssuesToMessage(Issues, ev->Get()->Record.MutableIssues()); + IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues()); } Send(ResultId, ev->Release().Release()); } @@ -531,6 +543,7 @@ private: return AggrPeriod ? AggregateQueryStatsByStage(TaskStat, Stages) : TaskStat; } + bool ChannelsUpdated = false; TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks; THashSet<ui64> FinishedTasks; @@ -547,7 +560,7 @@ private: NYql::NCommon::TServiceCounters ServiceCounters; TDuration PingPeriod = TDuration::Zero(); TDuration AggrPeriod = TDuration::Zero(); - TIssues Issues; + NYql::NDq::GroupedIssues Issues; ui64 PingCookie = 0; }; diff --git a/ydb/library/yql/providers/dq/actors/ut/CMakeLists.darwin.txt b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.darwin.txt new file mode 100644 index 0000000000..92b8368d68 --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.darwin.txt @@ -0,0 +1,47 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-providers-dq-actors-ut) +target_compile_options(ydb-library-yql-providers-dq-actors-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-providers-dq-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors +) +target_link_libraries(ydb-library-yql-providers-dq-actors-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + providers-dq-actors + cpp-testing-unittest +) +target_link_options(ydb-library-yql-providers-dq-actors-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-library-yql-providers-dq-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp +) +add_test( + NAME + ydb-library-yql-providers-dq-actors-ut + COMMAND + ydb-library-yql-providers-dq-actors-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-library-yql-providers-dq-actors-ut) diff --git a/ydb/library/yql/providers/dq/actors/ut/CMakeLists.linux.txt b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.linux.txt new file mode 100644 index 0000000000..5a9b83e399 --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.linux.txt @@ -0,0 +1,51 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-providers-dq-actors-ut) +target_compile_options(ydb-library-yql-providers-dq-actors-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-providers-dq-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors +) +target_link_libraries(ydb-library-yql-providers-dq-actors-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + providers-dq-actors + cpp-testing-unittest +) +target_link_options(ydb-library-yql-providers-dq-actors-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-library-yql-providers-dq-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp +) +add_test( + NAME + ydb-library-yql-providers-dq-actors-ut + COMMAND + ydb-library-yql-providers-dq-actors-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-library-yql-providers-dq-actors-ut) diff --git a/ydb/library/yql/providers/dq/actors/ut/CMakeLists.txt b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.txt new file mode 100644 index 0000000000..fc7b1ee73c --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/ut/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 8145334021..874b528c51 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -189,6 +189,10 @@ message TDqFailure { NYql.NDqProto.StatusIds.StatusCode StatusCode = 8; }; +message TDqStats { + repeated Ydb.Issue.IssueMessage Issues = 1; +}; + message TGraphRequest { Yql.DqsProto.ExecuteGraphRequest Request = 1; NActorsProto.TActorId ControlId = 2; diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index dc0825146c..0ab2a01e18 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -272,6 +272,7 @@ private: TIssues issues { TIssue(errorBuilder) }; SINK_LOG_W("Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString()); + Callbacks->OnAsyncOutputError(OutputIndex, issues, res->IsTerminal); return; } |