diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-01 12:24:10 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-01 12:42:02 +0300 |
commit | cc871496ef02852bf6ca96470fa61c253940ddf4 (patch) | |
tree | a11a70eafe634c39c7f5ab7a6fabb12667ea5d63 /yt/yql/plugin/native | |
parent | 79de9208b0944f6eba265db2b34f4c36735dd06c (diff) | |
download | ydb-cc871496ef02852bf6ca96470fa61c253940ddf4.tar.gz |
Intermediate changes
Diffstat (limited to 'yt/yql/plugin/native')
-rw-r--r-- | yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt | 5 | ||||
-rw-r--r-- | yt/yql/plugin/native/CMakeLists.linux-aarch64.txt | 5 | ||||
-rw-r--r-- | yt/yql/plugin/native/CMakeLists.linux-x86_64.txt | 5 | ||||
-rw-r--r-- | yt/yql/plugin/native/CMakeLists.windows-x86_64.txt | 5 | ||||
-rw-r--r-- | yt/yql/plugin/native/error_helpers.cpp | 4 | ||||
-rw-r--r-- | yt/yql/plugin/native/error_helpers.h | 2 | ||||
-rw-r--r-- | yt/yql/plugin/native/plugin.cpp | 70 | ||||
-rw-r--r-- | yt/yql/plugin/native/progress_merger.cpp | 116 | ||||
-rw-r--r-- | yt/yql/plugin/native/progress_merger.h | 38 | ||||
-rw-r--r-- | yt/yql/plugin/native/ya.make | 3 |
10 files changed, 239 insertions, 14 deletions
diff --git a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt index 753bac36c8..7c002160ee 100644 --- a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt +++ b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt index 348d9ca192..b85008e3c5 100644 --- a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt +++ b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt @@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt index 348d9ca192..b85008e3c5 100644 --- a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt +++ b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt @@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt index 753bac36c8..7c002160ee 100644 --- a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt +++ b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/error_helpers.cpp b/yt/yql/plugin/native/error_helpers.cpp index 23b9fa5cba..8301c341f9 100644 --- a/yt/yql/plugin/native/error_helpers.cpp +++ b/yt/yql/plugin/native/error_helpers.cpp @@ -14,7 +14,7 @@ const int IssueToErrorCodesShift = 30000; //////////////////////////////////////////////////////////////////////////////// -TString ExceptionToYtErrorYson(const std::exception& exception) +TString MessageToYtErrorYson(const TString& message) { TStringStream yson; ::NYson::TYsonWriter writer(&yson); @@ -23,7 +23,7 @@ TString ExceptionToYtErrorYson(const std::exception& exception) writer.OnKeyedItem("code"); writer.OnInt64Scalar(1); // Generic error writer.OnKeyedItem("message"); - writer.OnStringScalar(exception.what()); + writer.OnStringScalar(message); writer.OnKeyedItem("attributes"); writer.OnBeginMap(); writer.OnEndMap(); diff --git a/yt/yql/plugin/native/error_helpers.h b/yt/yql/plugin/native/error_helpers.h index 1905502aa8..4e57cdba40 100644 --- a/yt/yql/plugin/native/error_helpers.h +++ b/yt/yql/plugin/native/error_helpers.h @@ -10,7 +10,7 @@ namespace NYT::NYqlPlugin { TString IssuesToYtErrorYson(const NYql::TIssues& issues); -TString ExceptionToYtErrorYson(const std::exception& exception); +TString MessageToYtErrorYson(const TString& message); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp index 32a632b59e..12a7586851 100644 --- a/yt/yql/plugin/native/plugin.cpp +++ b/yt/yql/plugin/native/plugin.cpp @@ -1,6 +1,7 @@ #include "plugin.h" #include "error_helpers.h" +#include "progress_merger.h" #include <ydb/library/yql/providers/yt/lib/log/yt_logger.h> #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h> @@ -26,8 +27,9 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/logging/logger.h> -#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yt/threading/rw_spin_lock.h> +#include <library/cpp/yson/node/node_io.h> #include <library/cpp/yson/parser.h> #include <library/cpp/yson/writer.h> @@ -47,6 +49,14 @@ using namespace NYson; //////////////////////////////////////////////////////////////////////////////// +struct TActiveQuery +{ + TProgressMerger ProgressMerger; + std::optional<TString> Plan; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TYqlPlugin : public IYqlPlugin { @@ -171,7 +181,7 @@ public: YQL_LOG(INFO) << "YQL plugin initialized"; } - TQueryResult GuardedRun(TString impersonationUser, TString queryText, TYsonString settings) + TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) { auto credentials = MakeIntrusive<NYql::TCredentials>(); if (YTTokenPath_) { @@ -187,6 +197,19 @@ public: auto program = ProgramFactory_->Create("-memory-", queryText); program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings)); + auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> { + if (!maybeStr) { + return std::nullopt; + } + return *maybeStr; + }; + + program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress); + ActiveQueriesProgress_[queryId].Plan = maybeToOptional(program->GetQueryPlan()); + }); + NSQLTranslation::TTranslationSettings sqlSettings; sqlSettings.ClusterMapping = Clusters_; sqlSettings.ModuleMapping = Modules_; @@ -228,28 +251,51 @@ public: yson.OnEndList(); } - auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> { - if (!maybeStr) { - return std::nullopt; - } - return *maybeStr; - }; + TString progress; + { + auto guard = WriterGuard(ProgressSpinLock); + progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString(); + ActiveQueriesProgress_.erase(queryId); + } return { .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()), .Plan = maybeToOptional(program->GetQueryPlan()), .Statistics = maybeToOptional(program->GetStatistics()), + .Progress = progress, .TaskInfo = maybeToOptional(program->GetTasksInfo()), }; } - TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept override + TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept override { try { - return GuardedRun(impersonationUser, queryText, settings); + return GuardedRun(queryId, impersonationUser, queryText, settings); } catch (const std::exception& ex) { + { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_.erase(queryId); + } + + return TQueryResult{ + .YsonError = MessageToYtErrorYson(ex.what()), + }; + } + } + + TQueryResult GetProgress(TQueryId queryId) noexcept override + { + auto guard = ReaderGuard(ProgressSpinLock); + if (ActiveQueriesProgress_.contains(queryId)) { + TQueryResult result; + if (ActiveQueriesProgress_[queryId].ProgressMerger.HasChangesSinceLastFlush()) { + result.Plan = ActiveQueriesProgress_[queryId].Plan; + result.Progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString(); + } + return result; + } else { return TQueryResult{ - .YsonError = ExceptionToYtErrorYson(ex), + .YsonError = MessageToYtErrorYson(Format("No progress for queryId: %v", queryId)), }; } } @@ -266,6 +312,8 @@ private: std::optional<TString> DefaultCluster_; THashMap<TString, TString> Modules_; TYsonString OperationAttributes_; + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ProgressSpinLock); + THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_; TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings) { diff --git a/yt/yql/plugin/native/progress_merger.cpp b/yt/yql/plugin/native/progress_merger.cpp new file mode 100644 index 0000000000..3a0a02604e --- /dev/null +++ b/yt/yql/plugin/native/progress_merger.cpp @@ -0,0 +1,116 @@ +#include "progress_merger.h" + + +namespace NYT::NYqlPlugin { + +////////////////////////////////////////////////////////////////////////////// + +TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p) + : TNodeProgressBase(p) +{} + +void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const +{ + writer.OnBeginMap(); + { + writer.OnKeyedItem("category"); + writer.OnStringScalar(Progress_.Category); + + writer.OnKeyedItem("state"); + writer.OnStringScalar(ToString(Progress_.State)); + + writer.OnKeyedItem("remoteId"); + writer.OnStringScalar(Progress_.RemoteId); + + writer.OnKeyedItem("stages"); + writer.OnBeginMap(); + for (size_t index = 0; index < Stages_.size(); index++) { + writer.OnKeyedItem(ToString(index)); + writer.OnBeginMap(); + { + writer.OnKeyedItem(Stages_[index].first); + writer.OnStringScalar(Stages_[index].second.ToString()); + } + writer.OnEndMap(); + } + writer.OnEndMap(); + + if (Progress_.Counters) { + writer.OnKeyedItem("completed"); + writer.OnUint64Scalar(Progress_.Counters->Completed); + + writer.OnKeyedItem("running"); + writer.OnUint64Scalar(Progress_.Counters->Running); + + writer.OnKeyedItem("total"); + writer.OnUint64Scalar(Progress_.Counters->Total); + + writer.OnKeyedItem("aborted"); + writer.OnUint64Scalar(Progress_.Counters->Aborted); + + writer.OnKeyedItem("failed"); + writer.OnUint64Scalar(Progress_.Counters->Failed); + + writer.OnKeyedItem("lost"); + writer.OnUint64Scalar(Progress_.Counters->Lost); + + writer.OnKeyedItem("pending"); + writer.OnUint64Scalar(Progress_.Counters->Pending); + } + + writer.OnKeyedItem("startedAt"); + writer.OnStringScalar(StartedAt_.ToString()); + + if (FinishedAt_ != TInstant::Max()) { + writer.OnKeyedItem("finishedAt"); + writer.OnStringScalar(FinishedAt_.ToString()); + } + } + writer.OnEndMap(); +} + +////////////////////////////////////////////////////////////////////////////// + +void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress) +{ + auto in = NodesMap_.emplace(progress.Id, progress); + if (!in.second) { + in.first->second.MergeWith(progress); + } + HasChanges_ = true; +} + +void TProgressMerger::AbortAllUnfinishedNodes() +{ + for (auto& node: NodesMap_) { + if (node.second.IsUnfinished()) { + node.second.Abort(); + HasChanges_ = true; + } + } +} + +TString TProgressMerger::ToYsonString() +{ + TStringStream yson; + ::NYson::TYsonWriter writer(&yson); + + writer.OnBeginMap(); + for (auto& node: NodesMap_) { + writer.OnKeyedItem(ToString(node.first)); + node.second.Serialize(writer); + } + writer.OnEndMap(); + HasChanges_ = false; + + return yson.Str(); +} + +bool TProgressMerger::HasChangesSinceLastFlush() const +{ + return HasChanges_; +} + +////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYqlPlugin diff --git a/yt/yql/plugin/native/progress_merger.h b/yt/yql/plugin/native/progress_merger.h new file mode 100644 index 0000000000..3bb5f101d0 --- /dev/null +++ b/yt/yql/plugin/native/progress_merger.h @@ -0,0 +1,38 @@ +#pragma once + +#include <ydb/library/yql/core/yql_execution.h> +#include <ydb/library/yql/core/progress_merger/progress_merger.h> + +#include <library/cpp/yson/writer.h> + +namespace NYT::NYqlPlugin { + +using namespace NYql::NProgressMerger; +using namespace NYson; + +////////////////////////////////////////////////////////////////////////////// + +class TNodeProgress : public TNodeProgressBase { +public: + TNodeProgress(const NYql::TOperationProgress& p); + void Serialize(::NYson::TYsonWriter& yson) const; +}; + +////////////////////////////////////////////////////////////////////////////// + +class TProgressMerger : public ITaskProgressMerger { +public: + void MergeWith(const NYql::TOperationProgress& progress) override; + void AbortAllUnfinishedNodes() override; + + bool HasChangesSinceLastFlush() const; + TString ToYsonString(); + +private: + bool HasChanges_ = false; + THashMap<ui32, TNodeProgress> NodesMap_; +}; + +////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYqlPlugin diff --git a/yt/yql/plugin/native/ya.make b/yt/yql/plugin/native/ya.make index 36d562a4d2..15f3851411 100644 --- a/yt/yql/plugin/native/ya.make +++ b/yt/yql/plugin/native/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( GLOBAL plugin.cpp error_helpers.cpp + progress_merger.cpp ) PEERDIR( @@ -19,6 +20,7 @@ PEERDIR( ydb/library/yql/core/file_storage ydb/library/yql/core/file_storage/proto ydb/library/yql/core/file_storage/http_download + ydb/library/yql/core/progress_merger ydb/library/yql/core/services/mounts ydb/library/yql/core/user_data ydb/library/yql/minikql @@ -30,6 +32,7 @@ PEERDIR( ydb/library/yql/providers/common/udf_resolve ydb/library/yql/providers/solomon/gateway ydb/library/yql/providers/solomon/provider + ydb/library/yql/core ydb/library/yql/core/url_preprocessing ydb/library/yql/providers/yt/gateway/native ydb/library/yql/providers/yt/lib/log |