aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-09-01 12:24:10 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-09-01 12:42:02 +0300
commitcc871496ef02852bf6ca96470fa61c253940ddf4 (patch)
treea11a70eafe634c39c7f5ab7a6fabb12667ea5d63
parent79de9208b0944f6eba265db2b34f4c36735dd06c (diff)
downloadydb-cc871496ef02852bf6ca96470fa61c253940ddf4.tar.gz
Intermediate changes
-rw-r--r--ydb/library/yql/core/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/core/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/core/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/core/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/core/progress_merger/CMakeLists.darwin-x86_64.txt18
-rw-r--r--ydb/library/yql/core/progress_merger/CMakeLists.linux-aarch64.txt19
-rw-r--r--ydb/library/yql/core/progress_merger/CMakeLists.linux-x86_64.txt19
-rw-r--r--ydb/library/yql/core/progress_merger/CMakeLists.txt17
-rw-r--r--ydb/library/yql/core/progress_merger/CMakeLists.windows-x86_64.txt18
-rw-r--r--ydb/library/yql/core/progress_merger/progress_merger.cpp89
-rw-r--r--ydb/library/yql/core/progress_merger/progress_merger.h48
-rw-r--r--ydb/library/yql/core/progress_merger/ya.make12
-rw-r--r--yt/yql/plugin/bridge/interface.h8
-rw-r--r--yt/yql/plugin/bridge/plugin.cpp46
-rw-r--r--yt/yql/plugin/dynamic/dylib.exports1
-rw-r--r--yt/yql/plugin/dynamic/impl.cpp50
-rw-r--r--yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt5
-rw-r--r--yt/yql/plugin/native/CMakeLists.linux-aarch64.txt5
-rw-r--r--yt/yql/plugin/native/CMakeLists.linux-x86_64.txt5
-rw-r--r--yt/yql/plugin/native/CMakeLists.windows-x86_64.txt5
-rw-r--r--yt/yql/plugin/native/error_helpers.cpp4
-rw-r--r--yt/yql/plugin/native/error_helpers.h2
-rw-r--r--yt/yql/plugin/native/plugin.cpp70
-rw-r--r--yt/yql/plugin/native/progress_merger.cpp116
-rw-r--r--yt/yql/plugin/native/progress_merger.h38
-rw-r--r--yt/yql/plugin/native/ya.make3
-rw-r--r--yt/yql/plugin/plugin.h13
27 files changed, 568 insertions, 47 deletions
diff --git a/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt
index a29f25c4c6..0e86b20c36 100644
--- a/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt
@@ -17,6 +17,7 @@ add_subdirectory(facade)
add_subdirectory(file_storage)
add_subdirectory(issue)
add_subdirectory(peephole_opt)
+add_subdirectory(progress_merger)
add_subdirectory(services)
add_subdirectory(spilling)
add_subdirectory(sql_types)
diff --git a/ydb/library/yql/core/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/CMakeLists.linux-aarch64.txt
index ad218bf318..b0e95587bd 100644
--- a/ydb/library/yql/core/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/core/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ add_subdirectory(facade)
add_subdirectory(file_storage)
add_subdirectory(issue)
add_subdirectory(peephole_opt)
+add_subdirectory(progress_merger)
add_subdirectory(services)
add_subdirectory(spilling)
add_subdirectory(sql_types)
diff --git a/ydb/library/yql/core/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/CMakeLists.linux-x86_64.txt
index ad218bf318..b0e95587bd 100644
--- a/ydb/library/yql/core/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/core/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,7 @@ add_subdirectory(facade)
add_subdirectory(file_storage)
add_subdirectory(issue)
add_subdirectory(peephole_opt)
+add_subdirectory(progress_merger)
add_subdirectory(services)
add_subdirectory(spilling)
add_subdirectory(sql_types)
diff --git a/ydb/library/yql/core/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/CMakeLists.windows-x86_64.txt
index a29f25c4c6..0e86b20c36 100644
--- a/ydb/library/yql/core/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/core/CMakeLists.windows-x86_64.txt
@@ -17,6 +17,7 @@ add_subdirectory(facade)
add_subdirectory(file_storage)
add_subdirectory(issue)
add_subdirectory(peephole_opt)
+add_subdirectory(progress_merger)
add_subdirectory(services)
add_subdirectory(spilling)
add_subdirectory(sql_types)
diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..3e9b4c5217
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-core-progress_merger)
+target_link_libraries(yql-core-progress_merger PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-core
+)
+target_sources(yql-core-progress_merger PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp
+)
diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..a70e773280
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-core-progress_merger)
+target_link_libraries(yql-core-progress_merger PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-core
+)
+target_sources(yql-core-progress_merger PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp
+)
diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..a70e773280
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-core-progress_merger)
+target_link_libraries(yql-core-progress_merger PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-core
+)
+target_sources(yql-core-progress_merger PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp
+)
diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.txt b/ydb/library/yql/core/progress_merger/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/core/progress_merger/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/progress_merger/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..3e9b4c5217
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-core-progress_merger)
+target_link_libraries(yql-core-progress_merger PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-core
+)
+target_sources(yql-core-progress_merger PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/progress_merger/progress_merger.cpp
+)
diff --git a/ydb/library/yql/core/progress_merger/progress_merger.cpp b/ydb/library/yql/core/progress_merger/progress_merger.cpp
new file mode 100644
index 0000000000..e64b9ac909
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/progress_merger.cpp
@@ -0,0 +1,89 @@
+#include "progress_merger.h"
+
+
+namespace NYql::NProgressMerger {
+
+//////////////////////////////////////////////////////////////////////////////
+// TNodeProgressBase
+//////////////////////////////////////////////////////////////////////////////
+
+TNodeProgressBase::TNodeProgressBase(const TOperationProgress& p)
+ : Progress_(p)
+ , StartedAt_(TInstant::Now())
+ , FinishedAt_(p.State == EState::Finished ? StartedAt_ : TInstant::Max())
+ , Dirty_(true)
+{
+ if (!p.Stage.first.empty()) {
+ Stages_.emplace_back(p.Stage);
+ }
+}
+
+TNodeProgressBase::TNodeProgressBase(
+ const TOperationProgress& p,
+ TInstant startedAt,
+ TInstant finishedAt,
+ const TVector<TOperationProgress::TStage>& stages)
+ : Progress_(p)
+ , StartedAt_(startedAt)
+ , FinishedAt_(finishedAt)
+ , Stages_(stages)
+ , Dirty_(true)
+{}
+
+
+bool TNodeProgressBase::MergeWith(const TOperationProgress& p) {
+ bool dirty = false;
+
+ // (1) remote id
+ if (!p.RemoteId.empty() && p.RemoteId != Progress_.RemoteId) {
+ Progress_.RemoteId = p.RemoteId;
+ dirty = true;
+ }
+
+ // (2) state
+ if (p.State != Progress_.State) {
+ Progress_.State = p.State;
+ dirty = true;
+ }
+
+ // (3) counters
+ if (p.Counters && (!Progress_.Counters || *p.Counters != *Progress_.Counters)) {
+ Progress_.Counters = p.Counters;
+ dirty = true;
+ }
+
+ // (4) finished time
+ if (Progress_.State == EState::Finished) {
+ FinishedAt_ = TInstant::Now();
+ dirty = true;
+ }
+
+ // (5) stage
+ if (!p.Stage.first.empty() && Progress_.Stage != p.Stage) {
+ Progress_.Stage = p.Stage;
+ Stages_.push_back(p.Stage);
+ dirty = true;
+ }
+ return Dirty_ = dirty;
+}
+
+void TNodeProgressBase::Abort() {
+ Progress_.State = EState::Aborted;
+ FinishedAt_ = TInstant::Now();
+ Dirty_ = true;
+}
+
+bool TNodeProgressBase::IsUnfinished() const {
+ return Progress_.State == EState::Started ||
+ Progress_.State == EState::InProgress;
+}
+
+bool TNodeProgressBase::IsDirty() const {
+ return Dirty_;
+}
+
+void TNodeProgressBase::SetDirty(bool dirty) {
+ Dirty_ = dirty;
+}
+
+} // namespace NYql::NProgressMerger
diff --git a/ydb/library/yql/core/progress_merger/progress_merger.h b/ydb/library/yql/core/progress_merger/progress_merger.h
new file mode 100644
index 0000000000..e4473e808d
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/progress_merger.h
@@ -0,0 +1,48 @@
+#include <ydb/library/yql/core/yql_execution.h>
+
+#include <util/generic/hash.h>
+#include <util/system/spinlock.h>
+
+
+namespace NYql::NProgressMerger {
+
+//////////////////////////////////////////////////////////////////////////////
+// TNodeProgressBase
+//////////////////////////////////////////////////////////////////////////////
+class TNodeProgressBase {
+public:
+ using EState = TOperationProgress::EState;
+
+ TNodeProgressBase(const TOperationProgress& p);
+ TNodeProgressBase(
+ const TOperationProgress& p,
+ TInstant startedAt,
+ TInstant finishedAt,
+ const TVector<TOperationProgress::TStage>& stages);
+
+ bool MergeWith(const TOperationProgress& p);
+ void Abort();
+ bool IsUnfinished() const;
+ bool IsDirty() const;
+ void SetDirty(bool dirty);
+
+protected:
+ TOperationProgress Progress_;
+ TInstant StartedAt_;
+ TInstant FinishedAt_;
+ TVector<TOperationProgress::TStage> Stages_;
+
+private:
+ bool Dirty_;
+};
+
+//////////////////////////////////////////////////////////////////////////////
+// ITaskProgressMerger
+//////////////////////////////////////////////////////////////////////////////
+struct ITaskProgressMerger {
+ virtual ~ITaskProgressMerger() = default;
+ virtual void MergeWith(const TOperationProgress& progress) = 0;
+ virtual void AbortAllUnfinishedNodes() = 0;
+};
+
+} // namespace NProgressMerger
diff --git a/ydb/library/yql/core/progress_merger/ya.make b/ydb/library/yql/core/progress_merger/ya.make
new file mode 100644
index 0000000000..1eb1a99183
--- /dev/null
+++ b/ydb/library/yql/core/progress_merger/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ progress_merger.h
+ progress_merger.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/core
+)
+
+END()
diff --git a/yt/yql/plugin/bridge/interface.h b/yt/yql/plugin/bridge/interface.h
index bcfc47f7c4..c51fb7e3f5 100644
--- a/yt/yql/plugin/bridge/interface.h
+++ b/yt/yql/plugin/bridge/interface.h
@@ -54,6 +54,8 @@ struct TBridgeQueryResult
ssize_t PlanLength = 0;
const char* Statistics = nullptr;
ssize_t StatisticsLength = 0;
+ const char* Progress = nullptr;
+ ssize_t ProgressLength = 0;
const char* TaskInfo = nullptr;
ssize_t TaskInfoLength = 0;
@@ -62,7 +64,8 @@ struct TBridgeQueryResult
};
using TFuncBridgeFreeQueryResult = void(TBridgeQueryResult* result);
-using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* impersonationUser, const char* queryText, const char* settings);
+using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings);
+using TFuncBridgeGetProgress = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId);
////////////////////////////////////////////////////////////////////////////////
@@ -70,6 +73,7 @@ using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char*
XX(BridgeCreateYqlPlugin) \
XX(BridgeFreeYqlPlugin) \
XX(BridgeFreeQueryResult) \
- XX(BridgeRun)
+ XX(BridgeRun) \
+ XX(BridgeGetProgress)
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/plugin/bridge/plugin.cpp b/yt/yql/plugin/bridge/plugin.cpp
index e0f570f254..a69a37f987 100644
--- a/yt/yql/plugin/bridge/plugin.cpp
+++ b/yt/yql/plugin/bridge/plugin.cpp
@@ -13,6 +13,20 @@ namespace NBridge {
////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+std::optional<TString> ToString(const char* str, size_t strLength)
+{
+ if (!str) {
+ return std::nullopt;
+ }
+ return TString(str, strLength);
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
class TDynamicYqlPlugin
{
public:
@@ -73,22 +87,30 @@ public:
BridgePlugin_ = BridgeCreateYqlPlugin(&bridgeOptions);
}
- TQueryResult Run(TString impersonationUser, TString queryText, NYson::TYsonString settings) noexcept override
+ TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, NYson::TYsonString settings) noexcept override
{
auto settingsString = settings ? settings.ToString() : "{}";
- auto* bridgeQueryResult = BridgeRun(BridgePlugin_, impersonationUser.data(), queryText.data(), settingsString.data());
- auto toString = [] (const char* str, size_t strLength) -> std::optional<TString> {
- if (!str) {
- return std::nullopt;
- }
- return TString(str, strLength);
+ auto queryIdStr = ToString(queryId);
+ auto* bridgeQueryResult = BridgeRun(BridgePlugin_, queryIdStr.data(), impersonationUser.data(), queryText.data(), settingsString.data());
+ TQueryResult queryResult = {
+ .YsonResult = ToString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength),
+ .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),
+ .Statistics = ToString(bridgeQueryResult->Statistics, bridgeQueryResult->StatisticsLength),
+ .Progress = ToString(bridgeQueryResult->Progress, bridgeQueryResult->ProgressLength),
+ .TaskInfo = ToString(bridgeQueryResult->TaskInfo, bridgeQueryResult->TaskInfoLength),
+ .YsonError = ToString(bridgeQueryResult->YsonError, bridgeQueryResult->YsonErrorLength),
};
+ BridgeFreeQueryResult(bridgeQueryResult);
+ return queryResult;
+ }
+
+ TQueryResult GetProgress(TQueryId queryId) noexcept override
+ {
+ auto queryIdStr = ToString(queryId);
+ auto* bridgeQueryResult = BridgeGetProgress(BridgePlugin_, queryIdStr.data());
TQueryResult queryResult = {
- .YsonResult = toString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength),
- .Plan = toString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),
- .Statistics = toString(bridgeQueryResult->Statistics, bridgeQueryResult->StatisticsLength),
- .TaskInfo = toString(bridgeQueryResult->TaskInfo, bridgeQueryResult->TaskInfoLength),
- .YsonError = toString(bridgeQueryResult->YsonError, bridgeQueryResult->YsonErrorLength),
+ .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),
+ .Progress = ToString(bridgeQueryResult->Progress, bridgeQueryResult->ProgressLength),
};
BridgeFreeQueryResult(bridgeQueryResult);
return queryResult;
diff --git a/yt/yql/plugin/dynamic/dylib.exports b/yt/yql/plugin/dynamic/dylib.exports
index ec1c95cb11..fc77529eaf 100644
--- a/yt/yql/plugin/dynamic/dylib.exports
+++ b/yt/yql/plugin/dynamic/dylib.exports
@@ -3,6 +3,7 @@ BridgeCreateYqlPlugin
BridgeFreeYqlPlugin
BridgeFreeQueryResult
BridgeRun
+BridgeGetProgress
# YQL <-> YQL UDFs interface.
UdfAllocateWithSize
diff --git a/yt/yql/plugin/dynamic/impl.cpp b/yt/yql/plugin/dynamic/impl.cpp
index e1a8139ce3..cfb8438718 100644
--- a/yt/yql/plugin/dynamic/impl.cpp
+++ b/yt/yql/plugin/dynamic/impl.cpp
@@ -51,31 +51,45 @@ void BridgeFreeQueryResult(TBridgeQueryResult* result)
delete result;
}
-TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* impersonationUser, const char* queryText, const char* settings)
+void FillString(const char*& str, ssize_t& strLength, const std::optional<TString>& original)
+{
+ if (!original) {
+ str = nullptr;
+ strLength = 0;
+ return;
+ }
+ char* copy = new char[original->size() + 1];
+ memcpy(copy, original->data(), original->size() + 1);
+ str = copy;
+ strLength = original->size();
+}
+
+TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings)
{
static const auto EmptyMap = TYsonString(TString("{}"));
auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
auto* bridgeResult = new TBridgeQueryResult;
- auto fillString = [] (const char*& str, ssize_t& strLength, const std::optional<TString>& original) {
- if (!original) {
- str = nullptr;
- strLength = 0;
- return;
- }
- char* copy = new char[original->size() + 1];
- memcpy(copy, original->data(), original->size() + 1);
- str = copy;
- strLength = original->size();
- };
+ auto result = nativePlugin->Run(NYT::TGuid::FromString(queryId), TString(impersonationUser), TString(queryText), settings ? TYsonString(TString(settings)) : EmptyMap);
+ FillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult);
+ FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
+ FillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics);
+ FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress);
+ FillString(bridgeResult->TaskInfo, bridgeResult->TaskInfoLength, result.TaskInfo);
+ FillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError);
+
+ return bridgeResult;
+}
+
+TBridgeQueryResult* BridgeGetProgress(TBridgeYqlPlugin* plugin, const char* queryId)
+{
+ auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
+ auto* bridgeResult = new TBridgeQueryResult;
- auto result = nativePlugin->Run(TString(impersonationUser), TString(queryText), settings ? TYsonString(TString(settings)) : EmptyMap);
- fillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult);
- fillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
- fillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics);
- fillString(bridgeResult->TaskInfo, bridgeResult->TaskInfoLength, result.TaskInfo);
- fillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError);
+ auto result = nativePlugin->GetProgress(NYT::TGuid::FromString(queryId));
+ FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
+ FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress);
return bridgeResult;
}
diff --git a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt
index 753bac36c8..7c002160ee 100644
--- a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt
+++ b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt
index 348d9ca192..b85008e3c5 100644
--- a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt
+++ b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt
@@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt
index 348d9ca192..b85008e3c5 100644
--- a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt
+++ b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt
@@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt
index 753bac36c8..7c002160ee 100644
--- a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt
+++ b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/error_helpers.cpp b/yt/yql/plugin/native/error_helpers.cpp
index 23b9fa5cba..8301c341f9 100644
--- a/yt/yql/plugin/native/error_helpers.cpp
+++ b/yt/yql/plugin/native/error_helpers.cpp
@@ -14,7 +14,7 @@ const int IssueToErrorCodesShift = 30000;
////////////////////////////////////////////////////////////////////////////////
-TString ExceptionToYtErrorYson(const std::exception& exception)
+TString MessageToYtErrorYson(const TString& message)
{
TStringStream yson;
::NYson::TYsonWriter writer(&yson);
@@ -23,7 +23,7 @@ TString ExceptionToYtErrorYson(const std::exception& exception)
writer.OnKeyedItem("code");
writer.OnInt64Scalar(1); // Generic error
writer.OnKeyedItem("message");
- writer.OnStringScalar(exception.what());
+ writer.OnStringScalar(message);
writer.OnKeyedItem("attributes");
writer.OnBeginMap();
writer.OnEndMap();
diff --git a/yt/yql/plugin/native/error_helpers.h b/yt/yql/plugin/native/error_helpers.h
index 1905502aa8..4e57cdba40 100644
--- a/yt/yql/plugin/native/error_helpers.h
+++ b/yt/yql/plugin/native/error_helpers.h
@@ -10,7 +10,7 @@ namespace NYT::NYqlPlugin {
TString IssuesToYtErrorYson(const NYql::TIssues& issues);
-TString ExceptionToYtErrorYson(const std::exception& exception);
+TString MessageToYtErrorYson(const TString& message);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp
index 32a632b59e..12a7586851 100644
--- a/yt/yql/plugin/native/plugin.cpp
+++ b/yt/yql/plugin/native/plugin.cpp
@@ -1,6 +1,7 @@
#include "plugin.h"
#include "error_helpers.h"
+#include "progress_merger.h"
#include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
@@ -26,8 +27,9 @@
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/logging/logger.h>
-#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yt/threading/rw_spin_lock.h>
+#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/yson/parser.h>
#include <library/cpp/yson/writer.h>
@@ -47,6 +49,14 @@ using namespace NYson;
////////////////////////////////////////////////////////////////////////////////
+struct TActiveQuery
+{
+ TProgressMerger ProgressMerger;
+ std::optional<TString> Plan;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TYqlPlugin
: public IYqlPlugin
{
@@ -171,7 +181,7 @@ public:
YQL_LOG(INFO) << "YQL plugin initialized";
}
- TQueryResult GuardedRun(TString impersonationUser, TString queryText, TYsonString settings)
+ TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings)
{
auto credentials = MakeIntrusive<NYql::TCredentials>();
if (YTTokenPath_) {
@@ -187,6 +197,19 @@ public:
auto program = ProgramFactory_->Create("-memory-", queryText);
program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
+ auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
+ if (!maybeStr) {
+ return std::nullopt;
+ }
+ return *maybeStr;
+ };
+
+ program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress);
+ ActiveQueriesProgress_[queryId].Plan = maybeToOptional(program->GetQueryPlan());
+ });
+
NSQLTranslation::TTranslationSettings sqlSettings;
sqlSettings.ClusterMapping = Clusters_;
sqlSettings.ModuleMapping = Modules_;
@@ -228,28 +251,51 @@ public:
yson.OnEndList();
}
- auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
- if (!maybeStr) {
- return std::nullopt;
- }
- return *maybeStr;
- };
+ TString progress;
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
+ ActiveQueriesProgress_.erase(queryId);
+ }
return {
.YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()),
.Plan = maybeToOptional(program->GetQueryPlan()),
.Statistics = maybeToOptional(program->GetStatistics()),
+ .Progress = progress,
.TaskInfo = maybeToOptional(program->GetTasksInfo()),
};
}
- TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept override
+ TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept override
{
try {
- return GuardedRun(impersonationUser, queryText, settings);
+ return GuardedRun(queryId, impersonationUser, queryText, settings);
} catch (const std::exception& ex) {
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_.erase(queryId);
+ }
+
+ return TQueryResult{
+ .YsonError = MessageToYtErrorYson(ex.what()),
+ };
+ }
+ }
+
+ TQueryResult GetProgress(TQueryId queryId) noexcept override
+ {
+ auto guard = ReaderGuard(ProgressSpinLock);
+ if (ActiveQueriesProgress_.contains(queryId)) {
+ TQueryResult result;
+ if (ActiveQueriesProgress_[queryId].ProgressMerger.HasChangesSinceLastFlush()) {
+ result.Plan = ActiveQueriesProgress_[queryId].Plan;
+ result.Progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
+ }
+ return result;
+ } else {
return TQueryResult{
- .YsonError = ExceptionToYtErrorYson(ex),
+ .YsonError = MessageToYtErrorYson(Format("No progress for queryId: %v", queryId)),
};
}
}
@@ -266,6 +312,8 @@ private:
std::optional<TString> DefaultCluster_;
THashMap<TString, TString> Modules_;
TYsonString OperationAttributes_;
+ YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ProgressSpinLock);
+ THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_;
TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
{
diff --git a/yt/yql/plugin/native/progress_merger.cpp b/yt/yql/plugin/native/progress_merger.cpp
new file mode 100644
index 0000000000..3a0a02604e
--- /dev/null
+++ b/yt/yql/plugin/native/progress_merger.cpp
@@ -0,0 +1,116 @@
+#include "progress_merger.h"
+
+
+namespace NYT::NYqlPlugin {
+
+//////////////////////////////////////////////////////////////////////////////
+
+TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p)
+ : TNodeProgressBase(p)
+{}
+
+void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const
+{
+ writer.OnBeginMap();
+ {
+ writer.OnKeyedItem("category");
+ writer.OnStringScalar(Progress_.Category);
+
+ writer.OnKeyedItem("state");
+ writer.OnStringScalar(ToString(Progress_.State));
+
+ writer.OnKeyedItem("remoteId");
+ writer.OnStringScalar(Progress_.RemoteId);
+
+ writer.OnKeyedItem("stages");
+ writer.OnBeginMap();
+ for (size_t index = 0; index < Stages_.size(); index++) {
+ writer.OnKeyedItem(ToString(index));
+ writer.OnBeginMap();
+ {
+ writer.OnKeyedItem(Stages_[index].first);
+ writer.OnStringScalar(Stages_[index].second.ToString());
+ }
+ writer.OnEndMap();
+ }
+ writer.OnEndMap();
+
+ if (Progress_.Counters) {
+ writer.OnKeyedItem("completed");
+ writer.OnUint64Scalar(Progress_.Counters->Completed);
+
+ writer.OnKeyedItem("running");
+ writer.OnUint64Scalar(Progress_.Counters->Running);
+
+ writer.OnKeyedItem("total");
+ writer.OnUint64Scalar(Progress_.Counters->Total);
+
+ writer.OnKeyedItem("aborted");
+ writer.OnUint64Scalar(Progress_.Counters->Aborted);
+
+ writer.OnKeyedItem("failed");
+ writer.OnUint64Scalar(Progress_.Counters->Failed);
+
+ writer.OnKeyedItem("lost");
+ writer.OnUint64Scalar(Progress_.Counters->Lost);
+
+ writer.OnKeyedItem("pending");
+ writer.OnUint64Scalar(Progress_.Counters->Pending);
+ }
+
+ writer.OnKeyedItem("startedAt");
+ writer.OnStringScalar(StartedAt_.ToString());
+
+ if (FinishedAt_ != TInstant::Max()) {
+ writer.OnKeyedItem("finishedAt");
+ writer.OnStringScalar(FinishedAt_.ToString());
+ }
+ }
+ writer.OnEndMap();
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress)
+{
+ auto in = NodesMap_.emplace(progress.Id, progress);
+ if (!in.second) {
+ in.first->second.MergeWith(progress);
+ }
+ HasChanges_ = true;
+}
+
+void TProgressMerger::AbortAllUnfinishedNodes()
+{
+ for (auto& node: NodesMap_) {
+ if (node.second.IsUnfinished()) {
+ node.second.Abort();
+ HasChanges_ = true;
+ }
+ }
+}
+
+TString TProgressMerger::ToYsonString()
+{
+ TStringStream yson;
+ ::NYson::TYsonWriter writer(&yson);
+
+ writer.OnBeginMap();
+ for (auto& node: NodesMap_) {
+ writer.OnKeyedItem(ToString(node.first));
+ node.second.Serialize(writer);
+ }
+ writer.OnEndMap();
+ HasChanges_ = false;
+
+ return yson.Str();
+}
+
+bool TProgressMerger::HasChangesSinceLastFlush() const
+{
+ return HasChanges_;
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYqlPlugin
diff --git a/yt/yql/plugin/native/progress_merger.h b/yt/yql/plugin/native/progress_merger.h
new file mode 100644
index 0000000000..3bb5f101d0
--- /dev/null
+++ b/yt/yql/plugin/native/progress_merger.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <ydb/library/yql/core/yql_execution.h>
+#include <ydb/library/yql/core/progress_merger/progress_merger.h>
+
+#include <library/cpp/yson/writer.h>
+
+namespace NYT::NYqlPlugin {
+
+using namespace NYql::NProgressMerger;
+using namespace NYson;
+
+//////////////////////////////////////////////////////////////////////////////
+
+class TNodeProgress : public TNodeProgressBase {
+public:
+ TNodeProgress(const NYql::TOperationProgress& p);
+ void Serialize(::NYson::TYsonWriter& yson) const;
+};
+
+//////////////////////////////////////////////////////////////////////////////
+
+class TProgressMerger : public ITaskProgressMerger {
+public:
+ void MergeWith(const NYql::TOperationProgress& progress) override;
+ void AbortAllUnfinishedNodes() override;
+
+ bool HasChangesSinceLastFlush() const;
+ TString ToYsonString();
+
+private:
+ bool HasChanges_ = false;
+ THashMap<ui32, TNodeProgress> NodesMap_;
+};
+
+//////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYqlPlugin
diff --git a/yt/yql/plugin/native/ya.make b/yt/yql/plugin/native/ya.make
index 36d562a4d2..15f3851411 100644
--- a/yt/yql/plugin/native/ya.make
+++ b/yt/yql/plugin/native/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
GLOBAL plugin.cpp
error_helpers.cpp
+ progress_merger.cpp
)
PEERDIR(
@@ -19,6 +20,7 @@ PEERDIR(
ydb/library/yql/core/file_storage
ydb/library/yql/core/file_storage/proto
ydb/library/yql/core/file_storage/http_download
+ ydb/library/yql/core/progress_merger
ydb/library/yql/core/services/mounts
ydb/library/yql/core/user_data
ydb/library/yql/minikql
@@ -30,6 +32,7 @@ PEERDIR(
ydb/library/yql/providers/common/udf_resolve
ydb/library/yql/providers/solomon/gateway
ydb/library/yql/providers/solomon/provider
+ ydb/library/yql/core
ydb/library/yql/core/url_preprocessing
ydb/library/yql/providers/yt/gateway/native
ydb/library/yql/providers/yt/lib/log
diff --git a/yt/yql/plugin/plugin.h b/yt/yql/plugin/plugin.h
index 46a08ba5e7..97e26c8c04 100644
--- a/yt/yql/plugin/plugin.h
+++ b/yt/yql/plugin/plugin.h
@@ -5,6 +5,8 @@
#include <library/cpp/logger/log.h>
+#include <library/cpp/yt/string/guid.h>
+
#include <library/cpp/yt/yson_string/string.h>
#include <optional>
@@ -15,6 +17,10 @@ using namespace NYson;
////////////////////////////////////////////////////////////////////////////////
+using TQueryId = TGuid;
+
+////////////////////////////////////////////////////////////////////////////////
+
class TYqlPluginOptions
{
public:
@@ -39,6 +45,7 @@ struct TQueryResult
std::optional<TString> YsonResult;
std::optional<TString> Plan;
std::optional<TString> Statistics;
+ std::optional<TString> Progress;
std::optional<TString> TaskInfo;
//! YSON representation of a YT error.
@@ -49,9 +56,13 @@ struct TQueryResult
//! There are two major implementation: one of them is based
//! on YQL code and another wraps the pure C bridge interface, which
//! is implemented by a dynamic library.
+/*!
+* \note Thread affinity: any
+*/
struct IYqlPlugin
{
- virtual TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept = 0;
+ virtual TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept = 0;
+ virtual TQueryResult GetProgress(TQueryId queryId) noexcept = 0;
virtual ~IYqlPlugin() = default;
};