diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-01 12:24:10 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-09-01 12:42:02 +0300 |
commit | cc871496ef02852bf6ca96470fa61c253940ddf4 (patch) | |
tree | a11a70eafe634c39c7f5ab7a6fabb12667ea5d63 | |
parent | 79de9208b0944f6eba265db2b34f4c36735dd06c (diff) | |
download | ydb-cc871496ef02852bf6ca96470fa61c253940ddf4.tar.gz |
Intermediate changes
27 files changed, 568 insertions, 47 deletions
diff --git a/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt index a29f25c4c6..0e86b20c36 100644 --- a/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt @@ -17,6 +17,7 @@ add_subdirectory(facade) add_subdirectory(file_storage) add_subdirectory(issue) add_subdirectory(peephole_opt) +add_subdirectory(progress_merger) add_subdirectory(services) add_subdirectory(spilling) add_subdirectory(sql_types) diff --git a/ydb/library/yql/core/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/CMakeLists.linux-aarch64.txt index ad218bf318..b0e95587bd 100644 --- a/ydb/library/yql/core/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/core/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ add_subdirectory(facade) add_subdirectory(file_storage) add_subdirectory(issue) add_subdirectory(peephole_opt) +add_subdirectory(progress_merger) add_subdirectory(services) add_subdirectory(spilling) add_subdirectory(sql_types) diff --git a/ydb/library/yql/core/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/CMakeLists.linux-x86_64.txt index ad218bf318..b0e95587bd 100644 --- a/ydb/library/yql/core/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/core/CMakeLists.linux-x86_64.txt @@ -17,6 +17,7 @@ add_subdirectory(facade) add_subdirectory(file_storage) add_subdirectory(issue) add_subdirectory(peephole_opt) +add_subdirectory(progress_merger) add_subdirectory(services) add_subdirectory(spilling) add_subdirectory(sql_types) diff --git a/ydb/library/yql/core/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/CMakeLists.windows-x86_64.txt index a29f25c4c6..0e86b20c36 100644 --- a/ydb/library/yql/core/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/core/CMakeLists.windows-x86_64.txt @@ -17,6 +17,7 @@ add_subdirectory(facade) add_subdirectory(file_storage) add_subdirectory(issue) add_subdirectory(peephole_opt) +add_subdirectory(progress_merger) add_subdirectory(services) add_subdirectory(spilling) add_subdirectory(sql_types) diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..3e9b4c5217 --- /dev/null +++ b/ydb/library/yql/core/progress_merger/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-core-progress_merger) +target_link_libraries(yql-core-progress_merger PUBLIC + contrib-libs-cxxsupp + yutil + library-yql-core +) +target_sources(yql-core-progress_merger PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp +) diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..a70e773280 --- /dev/null +++ b/ydb/library/yql/core/progress_merger/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-core-progress_merger) +target_link_libraries(yql-core-progress_merger PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-core +) +target_sources(yql-core-progress_merger PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp +) diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..a70e773280 --- /dev/null +++ b/ydb/library/yql/core/progress_merger/CMakeLists.linux-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-core-progress_merger) +target_link_libraries(yql-core-progress_merger PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-core +) +target_sources(yql-core-progress_merger PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp +) diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.txt b/ydb/library/yql/core/progress_merger/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/core/progress_merger/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..3e9b4c5217 --- /dev/null +++ b/ydb/library/yql/core/progress_merger/CMakeLists.windows-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-core-progress_merger) +target_link_libraries(yql-core-progress_merger PUBLIC + contrib-libs-cxxsupp + yutil + library-yql-core +) +target_sources(yql-core-progress_merger PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp +) diff --git a/ydb/library/yql/core/progress_merger/progress_merger.cpp b/ydb/library/yql/core/progress_merger/progress_merger.cpp new file mode 100644 index 0000000000..e64b9ac909 --- /dev/null +++ b/ydb/library/yql/core/progress_merger/progress_merger.cpp @@ -0,0 +1,89 @@ +#include "progress_merger.h" + + +namespace NYql::NProgressMerger { + +////////////////////////////////////////////////////////////////////////////// +// TNodeProgressBase +////////////////////////////////////////////////////////////////////////////// + +TNodeProgressBase::TNodeProgressBase(const TOperationProgress& p) + : Progress_(p) + , StartedAt_(TInstant::Now()) + , FinishedAt_(p.State == EState::Finished ? StartedAt_ : TInstant::Max()) + , Dirty_(true) +{ + if (!p.Stage.first.empty()) { + Stages_.emplace_back(p.Stage); + } +} + +TNodeProgressBase::TNodeProgressBase( + const TOperationProgress& p, + TInstant startedAt, + TInstant finishedAt, + const TVector<TOperationProgress::TStage>& stages) + : Progress_(p) + , StartedAt_(startedAt) + , FinishedAt_(finishedAt) + , Stages_(stages) + , Dirty_(true) +{} + + +bool TNodeProgressBase::MergeWith(const TOperationProgress& p) { + bool dirty = false; + + // (1) remote id + if (!p.RemoteId.empty() && p.RemoteId != Progress_.RemoteId) { + Progress_.RemoteId = p.RemoteId; + dirty = true; + } + + // (2) state + if (p.State != Progress_.State) { + Progress_.State = p.State; + dirty = true; + } + + // (3) counters + if (p.Counters && (!Progress_.Counters || *p.Counters != *Progress_.Counters)) { + Progress_.Counters = p.Counters; + dirty = true; + } + + // (4) finished time + if (Progress_.State == EState::Finished) { + FinishedAt_ = TInstant::Now(); + dirty = true; + } + + // (5) stage + if (!p.Stage.first.empty() && Progress_.Stage != p.Stage) { + Progress_.Stage = p.Stage; + Stages_.push_back(p.Stage); + dirty = true; + } + return Dirty_ = dirty; +} + +void TNodeProgressBase::Abort() { + Progress_.State = EState::Aborted; + FinishedAt_ = TInstant::Now(); + Dirty_ = true; +} + +bool TNodeProgressBase::IsUnfinished() const { + return Progress_.State == EState::Started || + Progress_.State == EState::InProgress; +} + +bool TNodeProgressBase::IsDirty() const { + return Dirty_; +} + +void TNodeProgressBase::SetDirty(bool dirty) { + Dirty_ = dirty; +} + +} // namespace NYql::NProgressMerger diff --git a/ydb/library/yql/core/progress_merger/progress_merger.h b/ydb/library/yql/core/progress_merger/progress_merger.h new file mode 100644 index 0000000000..e4473e808d --- /dev/null +++ b/ydb/library/yql/core/progress_merger/progress_merger.h @@ -0,0 +1,48 @@ +#include <ydb/library/yql/core/yql_execution.h> + +#include <util/generic/hash.h> +#include <util/system/spinlock.h> + + +namespace NYql::NProgressMerger { + +////////////////////////////////////////////////////////////////////////////// +// TNodeProgressBase +////////////////////////////////////////////////////////////////////////////// +class TNodeProgressBase { +public: + using EState = TOperationProgress::EState; + + TNodeProgressBase(const TOperationProgress& p); + TNodeProgressBase( + const TOperationProgress& p, + TInstant startedAt, + TInstant finishedAt, + const TVector<TOperationProgress::TStage>& stages); + + bool MergeWith(const TOperationProgress& p); + void Abort(); + bool IsUnfinished() const; + bool IsDirty() const; + void SetDirty(bool dirty); + +protected: + TOperationProgress Progress_; + TInstant StartedAt_; + TInstant FinishedAt_; + TVector<TOperationProgress::TStage> Stages_; + +private: + bool Dirty_; +}; + +////////////////////////////////////////////////////////////////////////////// +// ITaskProgressMerger +////////////////////////////////////////////////////////////////////////////// +struct ITaskProgressMerger { + virtual ~ITaskProgressMerger() = default; + virtual void MergeWith(const TOperationProgress& progress) = 0; + virtual void AbortAllUnfinishedNodes() = 0; +}; + +} // namespace NProgressMerger diff --git a/ydb/library/yql/core/progress_merger/ya.make b/ydb/library/yql/core/progress_merger/ya.make new file mode 100644 index 0000000000..1eb1a99183 --- /dev/null +++ b/ydb/library/yql/core/progress_merger/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + progress_merger.h + progress_merger.cpp +) + +PEERDIR( + ydb/library/yql/core +) + +END() diff --git a/yt/yql/plugin/bridge/interface.h b/yt/yql/plugin/bridge/interface.h index bcfc47f7c4..c51fb7e3f5 100644 --- a/yt/yql/plugin/bridge/interface.h +++ b/yt/yql/plugin/bridge/interface.h @@ -54,6 +54,8 @@ struct TBridgeQueryResult ssize_t PlanLength = 0; const char* Statistics = nullptr; ssize_t StatisticsLength = 0; + const char* Progress = nullptr; + ssize_t ProgressLength = 0; const char* TaskInfo = nullptr; ssize_t TaskInfoLength = 0; @@ -62,7 +64,8 @@ struct TBridgeQueryResult }; using TFuncBridgeFreeQueryResult = void(TBridgeQueryResult* result); -using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* impersonationUser, const char* queryText, const char* settings); +using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings); +using TFuncBridgeGetProgress = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId); //////////////////////////////////////////////////////////////////////////////// @@ -70,6 +73,7 @@ using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* XX(BridgeCreateYqlPlugin) \ XX(BridgeFreeYqlPlugin) \ XX(BridgeFreeQueryResult) \ - XX(BridgeRun) + XX(BridgeRun) \ + XX(BridgeGetProgress) //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yql/plugin/bridge/plugin.cpp b/yt/yql/plugin/bridge/plugin.cpp index e0f570f254..a69a37f987 100644 --- a/yt/yql/plugin/bridge/plugin.cpp +++ b/yt/yql/plugin/bridge/plugin.cpp @@ -13,6 +13,20 @@ namespace NBridge { //////////////////////////////////////////////////////////////////////////////// +namespace { + +std::optional<TString> ToString(const char* str, size_t strLength) +{ + if (!str) { + return std::nullopt; + } + return TString(str, strLength); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + class TDynamicYqlPlugin { public: @@ -73,22 +87,30 @@ public: BridgePlugin_ = BridgeCreateYqlPlugin(&bridgeOptions); } - TQueryResult Run(TString impersonationUser, TString queryText, NYson::TYsonString settings) noexcept override + TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, NYson::TYsonString settings) noexcept override { auto settingsString = settings ? settings.ToString() : "{}"; - auto* bridgeQueryResult = BridgeRun(BridgePlugin_, impersonationUser.data(), queryText.data(), settingsString.data()); - auto toString = [] (const char* str, size_t strLength) -> std::optional<TString> { - if (!str) { - return std::nullopt; - } - return TString(str, strLength); + auto queryIdStr = ToString(queryId); + auto* bridgeQueryResult = BridgeRun(BridgePlugin_, queryIdStr.data(), impersonationUser.data(), queryText.data(), settingsString.data()); + TQueryResult queryResult = { + .YsonResult = ToString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength), + .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength), + .Statistics = ToString(bridgeQueryResult->Statistics, bridgeQueryResult->StatisticsLength), + .Progress = ToString(bridgeQueryResult->Progress, bridgeQueryResult->ProgressLength), + .TaskInfo = ToString(bridgeQueryResult->TaskInfo, bridgeQueryResult->TaskInfoLength), + .YsonError = ToString(bridgeQueryResult->YsonError, bridgeQueryResult->YsonErrorLength), }; + BridgeFreeQueryResult(bridgeQueryResult); + return queryResult; + } + + TQueryResult GetProgress(TQueryId queryId) noexcept override + { + auto queryIdStr = ToString(queryId); + auto* bridgeQueryResult = BridgeGetProgress(BridgePlugin_, queryIdStr.data()); TQueryResult queryResult = { - .YsonResult = toString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength), - .Plan = toString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength), - .Statistics = toString(bridgeQueryResult->Statistics, bridgeQueryResult->StatisticsLength), - .TaskInfo = toString(bridgeQueryResult->TaskInfo, bridgeQueryResult->TaskInfoLength), - .YsonError = toString(bridgeQueryResult->YsonError, bridgeQueryResult->YsonErrorLength), + .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength), + .Progress = ToString(bridgeQueryResult->Progress, bridgeQueryResult->ProgressLength), }; BridgeFreeQueryResult(bridgeQueryResult); return queryResult; diff --git a/yt/yql/plugin/dynamic/dylib.exports b/yt/yql/plugin/dynamic/dylib.exports index ec1c95cb11..fc77529eaf 100644 --- a/yt/yql/plugin/dynamic/dylib.exports +++ b/yt/yql/plugin/dynamic/dylib.exports @@ -3,6 +3,7 @@ BridgeCreateYqlPlugin BridgeFreeYqlPlugin BridgeFreeQueryResult BridgeRun +BridgeGetProgress # YQL <-> YQL UDFs interface. UdfAllocateWithSize diff --git a/yt/yql/plugin/dynamic/impl.cpp b/yt/yql/plugin/dynamic/impl.cpp index e1a8139ce3..cfb8438718 100644 --- a/yt/yql/plugin/dynamic/impl.cpp +++ b/yt/yql/plugin/dynamic/impl.cpp @@ -51,31 +51,45 @@ void BridgeFreeQueryResult(TBridgeQueryResult* result) delete result; } -TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* impersonationUser, const char* queryText, const char* settings) +void FillString(const char*& str, ssize_t& strLength, const std::optional<TString>& original) +{ + if (!original) { + str = nullptr; + strLength = 0; + return; + } + char* copy = new char[original->size() + 1]; + memcpy(copy, original->data(), original->size() + 1); + str = copy; + strLength = original->size(); +} + +TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings) { static const auto EmptyMap = TYsonString(TString("{}")); auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin); auto* bridgeResult = new TBridgeQueryResult; - auto fillString = [] (const char*& str, ssize_t& strLength, const std::optional<TString>& original) { - if (!original) { - str = nullptr; - strLength = 0; - return; - } - char* copy = new char[original->size() + 1]; - memcpy(copy, original->data(), original->size() + 1); - str = copy; - strLength = original->size(); - }; + auto result = nativePlugin->Run(NYT::TGuid::FromString(queryId), TString(impersonationUser), TString(queryText), settings ? TYsonString(TString(settings)) : EmptyMap); + FillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult); + FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan); + FillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics); + FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress); + FillString(bridgeResult->TaskInfo, bridgeResult->TaskInfoLength, result.TaskInfo); + FillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError); + + return bridgeResult; +} + +TBridgeQueryResult* BridgeGetProgress(TBridgeYqlPlugin* plugin, const char* queryId) +{ + auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin); + auto* bridgeResult = new TBridgeQueryResult; - auto result = nativePlugin->Run(TString(impersonationUser), TString(queryText), settings ? TYsonString(TString(settings)) : EmptyMap); - fillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult); - fillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan); - fillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics); - fillString(bridgeResult->TaskInfo, bridgeResult->TaskInfoLength, result.TaskInfo); - fillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError); + auto result = nativePlugin->GetProgress(NYT::TGuid::FromString(queryId)); + FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan); + FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress); return bridgeResult; } diff --git a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt index 753bac36c8..7c002160ee 100644 --- a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt +++ b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt index 348d9ca192..b85008e3c5 100644 --- a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt +++ b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt @@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt index 348d9ca192..b85008e3c5 100644 --- a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt +++ b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt @@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt index 753bac36c8..7c002160ee 100644 --- a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt +++ b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log @@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC ) target_sources(yql-plugin-native PRIVATE ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp ) add_global_library_for(yql-plugin-native.global yql-plugin-native) @@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC yql-core-file_storage core-file_storage-proto core-file_storage-http_download + yql-core-progress_merger core-services-mounts yql-core-user_data library-yql-minikql @@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC providers-common-udf_resolve providers-solomon-gateway providers-solomon-provider + library-yql-core yql-core-url_preprocessing yt-gateway-native yt-lib-log diff --git a/yt/yql/plugin/native/error_helpers.cpp b/yt/yql/plugin/native/error_helpers.cpp index 23b9fa5cba..8301c341f9 100644 --- a/yt/yql/plugin/native/error_helpers.cpp +++ b/yt/yql/plugin/native/error_helpers.cpp @@ -14,7 +14,7 @@ const int IssueToErrorCodesShift = 30000; //////////////////////////////////////////////////////////////////////////////// -TString ExceptionToYtErrorYson(const std::exception& exception) +TString MessageToYtErrorYson(const TString& message) { TStringStream yson; ::NYson::TYsonWriter writer(&yson); @@ -23,7 +23,7 @@ TString ExceptionToYtErrorYson(const std::exception& exception) writer.OnKeyedItem("code"); writer.OnInt64Scalar(1); // Generic error writer.OnKeyedItem("message"); - writer.OnStringScalar(exception.what()); + writer.OnStringScalar(message); writer.OnKeyedItem("attributes"); writer.OnBeginMap(); writer.OnEndMap(); diff --git a/yt/yql/plugin/native/error_helpers.h b/yt/yql/plugin/native/error_helpers.h index 1905502aa8..4e57cdba40 100644 --- a/yt/yql/plugin/native/error_helpers.h +++ b/yt/yql/plugin/native/error_helpers.h @@ -10,7 +10,7 @@ namespace NYT::NYqlPlugin { TString IssuesToYtErrorYson(const NYql::TIssues& issues); -TString ExceptionToYtErrorYson(const std::exception& exception); +TString MessageToYtErrorYson(const TString& message); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp index 32a632b59e..12a7586851 100644 --- a/yt/yql/plugin/native/plugin.cpp +++ b/yt/yql/plugin/native/plugin.cpp @@ -1,6 +1,7 @@ #include "plugin.h" #include "error_helpers.h" +#include "progress_merger.h" #include <ydb/library/yql/providers/yt/lib/log/yt_logger.h> #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h> @@ -26,8 +27,9 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/logging/logger.h> -#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yt/threading/rw_spin_lock.h> +#include <library/cpp/yson/node/node_io.h> #include <library/cpp/yson/parser.h> #include <library/cpp/yson/writer.h> @@ -47,6 +49,14 @@ using namespace NYson; //////////////////////////////////////////////////////////////////////////////// +struct TActiveQuery +{ + TProgressMerger ProgressMerger; + std::optional<TString> Plan; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TYqlPlugin : public IYqlPlugin { @@ -171,7 +181,7 @@ public: YQL_LOG(INFO) << "YQL plugin initialized"; } - TQueryResult GuardedRun(TString impersonationUser, TString queryText, TYsonString settings) + TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) { auto credentials = MakeIntrusive<NYql::TCredentials>(); if (YTTokenPath_) { @@ -187,6 +197,19 @@ public: auto program = ProgramFactory_->Create("-memory-", queryText); program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings)); + auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> { + if (!maybeStr) { + return std::nullopt; + } + return *maybeStr; + }; + + program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress); + ActiveQueriesProgress_[queryId].Plan = maybeToOptional(program->GetQueryPlan()); + }); + NSQLTranslation::TTranslationSettings sqlSettings; sqlSettings.ClusterMapping = Clusters_; sqlSettings.ModuleMapping = Modules_; @@ -228,28 +251,51 @@ public: yson.OnEndList(); } - auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> { - if (!maybeStr) { - return std::nullopt; - } - return *maybeStr; - }; + TString progress; + { + auto guard = WriterGuard(ProgressSpinLock); + progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString(); + ActiveQueriesProgress_.erase(queryId); + } return { .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()), .Plan = maybeToOptional(program->GetQueryPlan()), .Statistics = maybeToOptional(program->GetStatistics()), + .Progress = progress, .TaskInfo = maybeToOptional(program->GetTasksInfo()), }; } - TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept override + TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept override { try { - return GuardedRun(impersonationUser, queryText, settings); + return GuardedRun(queryId, impersonationUser, queryText, settings); } catch (const std::exception& ex) { + { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_.erase(queryId); + } + + return TQueryResult{ + .YsonError = MessageToYtErrorYson(ex.what()), + }; + } + } + + TQueryResult GetProgress(TQueryId queryId) noexcept override + { + auto guard = ReaderGuard(ProgressSpinLock); + if (ActiveQueriesProgress_.contains(queryId)) { + TQueryResult result; + if (ActiveQueriesProgress_[queryId].ProgressMerger.HasChangesSinceLastFlush()) { + result.Plan = ActiveQueriesProgress_[queryId].Plan; + result.Progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString(); + } + return result; + } else { return TQueryResult{ - .YsonError = ExceptionToYtErrorYson(ex), + .YsonError = MessageToYtErrorYson(Format("No progress for queryId: %v", queryId)), }; } } @@ -266,6 +312,8 @@ private: std::optional<TString> DefaultCluster_; THashMap<TString, TString> Modules_; TYsonString OperationAttributes_; + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ProgressSpinLock); + THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_; TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings) { diff --git a/yt/yql/plugin/native/progress_merger.cpp b/yt/yql/plugin/native/progress_merger.cpp new file mode 100644 index 0000000000..3a0a02604e --- /dev/null +++ b/yt/yql/plugin/native/progress_merger.cpp @@ -0,0 +1,116 @@ +#include "progress_merger.h" + + +namespace NYT::NYqlPlugin { + +////////////////////////////////////////////////////////////////////////////// + +TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p) + : TNodeProgressBase(p) +{} + +void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const +{ + writer.OnBeginMap(); + { + writer.OnKeyedItem("category"); + writer.OnStringScalar(Progress_.Category); + + writer.OnKeyedItem("state"); + writer.OnStringScalar(ToString(Progress_.State)); + + writer.OnKeyedItem("remoteId"); + writer.OnStringScalar(Progress_.RemoteId); + + writer.OnKeyedItem("stages"); + writer.OnBeginMap(); + for (size_t index = 0; index < Stages_.size(); index++) { + writer.OnKeyedItem(ToString(index)); + writer.OnBeginMap(); + { + writer.OnKeyedItem(Stages_[index].first); + writer.OnStringScalar(Stages_[index].second.ToString()); + } + writer.OnEndMap(); + } + writer.OnEndMap(); + + if (Progress_.Counters) { + writer.OnKeyedItem("completed"); + writer.OnUint64Scalar(Progress_.Counters->Completed); + + writer.OnKeyedItem("running"); + writer.OnUint64Scalar(Progress_.Counters->Running); + + writer.OnKeyedItem("total"); + writer.OnUint64Scalar(Progress_.Counters->Total); + + writer.OnKeyedItem("aborted"); + writer.OnUint64Scalar(Progress_.Counters->Aborted); + + writer.OnKeyedItem("failed"); + writer.OnUint64Scalar(Progress_.Counters->Failed); + + writer.OnKeyedItem("lost"); + writer.OnUint64Scalar(Progress_.Counters->Lost); + + writer.OnKeyedItem("pending"); + writer.OnUint64Scalar(Progress_.Counters->Pending); + } + + writer.OnKeyedItem("startedAt"); + writer.OnStringScalar(StartedAt_.ToString()); + + if (FinishedAt_ != TInstant::Max()) { + writer.OnKeyedItem("finishedAt"); + writer.OnStringScalar(FinishedAt_.ToString()); + } + } + writer.OnEndMap(); +} + +////////////////////////////////////////////////////////////////////////////// + +void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress) +{ + auto in = NodesMap_.emplace(progress.Id, progress); + if (!in.second) { + in.first->second.MergeWith(progress); + } + HasChanges_ = true; +} + +void TProgressMerger::AbortAllUnfinishedNodes() +{ + for (auto& node: NodesMap_) { + if (node.second.IsUnfinished()) { + node.second.Abort(); + HasChanges_ = true; + } + } +} + +TString TProgressMerger::ToYsonString() +{ + TStringStream yson; + ::NYson::TYsonWriter writer(&yson); + + writer.OnBeginMap(); + for (auto& node: NodesMap_) { + writer.OnKeyedItem(ToString(node.first)); + node.second.Serialize(writer); + } + writer.OnEndMap(); + HasChanges_ = false; + + return yson.Str(); +} + +bool TProgressMerger::HasChangesSinceLastFlush() const +{ + return HasChanges_; +} + +////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYqlPlugin diff --git a/yt/yql/plugin/native/progress_merger.h b/yt/yql/plugin/native/progress_merger.h new file mode 100644 index 0000000000..3bb5f101d0 --- /dev/null +++ b/yt/yql/plugin/native/progress_merger.h @@ -0,0 +1,38 @@ +#pragma once + +#include <ydb/library/yql/core/yql_execution.h> +#include <ydb/library/yql/core/progress_merger/progress_merger.h> + +#include <library/cpp/yson/writer.h> + +namespace NYT::NYqlPlugin { + +using namespace NYql::NProgressMerger; +using namespace NYson; + +////////////////////////////////////////////////////////////////////////////// + +class TNodeProgress : public TNodeProgressBase { +public: + TNodeProgress(const NYql::TOperationProgress& p); + void Serialize(::NYson::TYsonWriter& yson) const; +}; + +////////////////////////////////////////////////////////////////////////////// + +class TProgressMerger : public ITaskProgressMerger { +public: + void MergeWith(const NYql::TOperationProgress& progress) override; + void AbortAllUnfinishedNodes() override; + + bool HasChangesSinceLastFlush() const; + TString ToYsonString(); + +private: + bool HasChanges_ = false; + THashMap<ui32, TNodeProgress> NodesMap_; +}; + +////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYqlPlugin diff --git a/yt/yql/plugin/native/ya.make b/yt/yql/plugin/native/ya.make index 36d562a4d2..15f3851411 100644 --- a/yt/yql/plugin/native/ya.make +++ b/yt/yql/plugin/native/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( GLOBAL plugin.cpp error_helpers.cpp + progress_merger.cpp ) PEERDIR( @@ -19,6 +20,7 @@ PEERDIR( ydb/library/yql/core/file_storage ydb/library/yql/core/file_storage/proto ydb/library/yql/core/file_storage/http_download + ydb/library/yql/core/progress_merger ydb/library/yql/core/services/mounts ydb/library/yql/core/user_data ydb/library/yql/minikql @@ -30,6 +32,7 @@ PEERDIR( ydb/library/yql/providers/common/udf_resolve ydb/library/yql/providers/solomon/gateway ydb/library/yql/providers/solomon/provider + ydb/library/yql/core ydb/library/yql/core/url_preprocessing ydb/library/yql/providers/yt/gateway/native ydb/library/yql/providers/yt/lib/log diff --git a/yt/yql/plugin/plugin.h b/yt/yql/plugin/plugin.h index 46a08ba5e7..97e26c8c04 100644 --- a/yt/yql/plugin/plugin.h +++ b/yt/yql/plugin/plugin.h @@ -5,6 +5,8 @@ #include <library/cpp/logger/log.h> +#include <library/cpp/yt/string/guid.h> + #include <library/cpp/yt/yson_string/string.h> #include <optional> @@ -15,6 +17,10 @@ using namespace NYson; //////////////////////////////////////////////////////////////////////////////// +using TQueryId = TGuid; + +//////////////////////////////////////////////////////////////////////////////// + class TYqlPluginOptions { public: @@ -39,6 +45,7 @@ struct TQueryResult std::optional<TString> YsonResult; std::optional<TString> Plan; std::optional<TString> Statistics; + std::optional<TString> Progress; std::optional<TString> TaskInfo; //! YSON representation of a YT error. @@ -49,9 +56,13 @@ struct TQueryResult //! There are two major implementation: one of them is based //! on YQL code and another wraps the pure C bridge interface, which //! is implemented by a dynamic library. +/*! +* \note Thread affinity: any +*/ struct IYqlPlugin { - virtual TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept = 0; + virtual TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept = 0; + virtual TQueryResult GetProgress(TQueryId queryId) noexcept = 0; virtual ~IYqlPlugin() = default; }; |