aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql/plugin/native
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-09-01 12:24:10 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-09-01 12:42:02 +0300
commitcc871496ef02852bf6ca96470fa61c253940ddf4 (patch)
treea11a70eafe634c39c7f5ab7a6fabb12667ea5d63 /yt/yql/plugin/native
parent79de9208b0944f6eba265db2b34f4c36735dd06c (diff)
downloadydb-cc871496ef02852bf6ca96470fa61c253940ddf4.tar.gz
Intermediate changes
Diffstat (limited to 'yt/yql/plugin/native')
-rw-r--r--yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt5
-rw-r--r--yt/yql/plugin/native/CMakeLists.linux-aarch64.txt5
-rw-r--r--yt/yql/plugin/native/CMakeLists.linux-x86_64.txt5
-rw-r--r--yt/yql/plugin/native/CMakeLists.windows-x86_64.txt5
-rw-r--r--yt/yql/plugin/native/error_helpers.cpp4
-rw-r--r--yt/yql/plugin/native/error_helpers.h2
-rw-r--r--yt/yql/plugin/native/plugin.cpp70
-rw-r--r--yt/yql/plugin/native/progress_merger.cpp116
-rw-r--r--yt/yql/plugin/native/progress_merger.h38
-rw-r--r--yt/yql/plugin/native/ya.make3
10 files changed, 239 insertions, 14 deletions
diff --git a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt
index 753bac36c8..7c002160ee 100644
--- a/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt
+++ b/yt/yql/plugin/native/CMakeLists.darwin-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt
index 348d9ca192..b85008e3c5 100644
--- a/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt
+++ b/yt/yql/plugin/native/CMakeLists.linux-aarch64.txt
@@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt
index 348d9ca192..b85008e3c5 100644
--- a/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt
+++ b/yt/yql/plugin/native/CMakeLists.linux-x86_64.txt
@@ -28,6 +28,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -39,6 +40,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -48,6 +50,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -71,6 +74,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -82,6 +86,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt
index 753bac36c8..7c002160ee 100644
--- a/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt
+++ b/yt/yql/plugin/native/CMakeLists.windows-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(yql-plugin-native PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -38,6 +39,7 @@ target_link_libraries(yql-plugin-native PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
@@ -47,6 +49,7 @@ target_link_libraries(yql-plugin-native PUBLIC
)
target_sources(yql-plugin-native PRIVATE
${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/error_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yql/plugin/native/progress_merger.cpp
)
add_global_library_for(yql-plugin-native.global yql-plugin-native)
@@ -69,6 +72,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
yql-core-file_storage
core-file_storage-proto
core-file_storage-http_download
+ yql-core-progress_merger
core-services-mounts
yql-core-user_data
library-yql-minikql
@@ -80,6 +84,7 @@ target_link_libraries(yql-plugin-native.global PUBLIC
providers-common-udf_resolve
providers-solomon-gateway
providers-solomon-provider
+ library-yql-core
yql-core-url_preprocessing
yt-gateway-native
yt-lib-log
diff --git a/yt/yql/plugin/native/error_helpers.cpp b/yt/yql/plugin/native/error_helpers.cpp
index 23b9fa5cba..8301c341f9 100644
--- a/yt/yql/plugin/native/error_helpers.cpp
+++ b/yt/yql/plugin/native/error_helpers.cpp
@@ -14,7 +14,7 @@ const int IssueToErrorCodesShift = 30000;
////////////////////////////////////////////////////////////////////////////////
-TString ExceptionToYtErrorYson(const std::exception& exception)
+TString MessageToYtErrorYson(const TString& message)
{
TStringStream yson;
::NYson::TYsonWriter writer(&yson);
@@ -23,7 +23,7 @@ TString ExceptionToYtErrorYson(const std::exception& exception)
writer.OnKeyedItem("code");
writer.OnInt64Scalar(1); // Generic error
writer.OnKeyedItem("message");
- writer.OnStringScalar(exception.what());
+ writer.OnStringScalar(message);
writer.OnKeyedItem("attributes");
writer.OnBeginMap();
writer.OnEndMap();
diff --git a/yt/yql/plugin/native/error_helpers.h b/yt/yql/plugin/native/error_helpers.h
index 1905502aa8..4e57cdba40 100644
--- a/yt/yql/plugin/native/error_helpers.h
+++ b/yt/yql/plugin/native/error_helpers.h
@@ -10,7 +10,7 @@ namespace NYT::NYqlPlugin {
TString IssuesToYtErrorYson(const NYql::TIssues& issues);
-TString ExceptionToYtErrorYson(const std::exception& exception);
+TString MessageToYtErrorYson(const TString& message);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp
index 32a632b59e..12a7586851 100644
--- a/yt/yql/plugin/native/plugin.cpp
+++ b/yt/yql/plugin/native/plugin.cpp
@@ -1,6 +1,7 @@
#include "plugin.h"
#include "error_helpers.h"
+#include "progress_merger.h"
#include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
@@ -26,8 +27,9 @@
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/logging/logger.h>
-#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yt/threading/rw_spin_lock.h>
+#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/yson/parser.h>
#include <library/cpp/yson/writer.h>
@@ -47,6 +49,14 @@ using namespace NYson;
////////////////////////////////////////////////////////////////////////////////
+struct TActiveQuery
+{
+ TProgressMerger ProgressMerger;
+ std::optional<TString> Plan;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TYqlPlugin
: public IYqlPlugin
{
@@ -171,7 +181,7 @@ public:
YQL_LOG(INFO) << "YQL plugin initialized";
}
- TQueryResult GuardedRun(TString impersonationUser, TString queryText, TYsonString settings)
+ TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings)
{
auto credentials = MakeIntrusive<NYql::TCredentials>();
if (YTTokenPath_) {
@@ -187,6 +197,19 @@ public:
auto program = ProgramFactory_->Create("-memory-", queryText);
program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
+ auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
+ if (!maybeStr) {
+ return std::nullopt;
+ }
+ return *maybeStr;
+ };
+
+ program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress);
+ ActiveQueriesProgress_[queryId].Plan = maybeToOptional(program->GetQueryPlan());
+ });
+
NSQLTranslation::TTranslationSettings sqlSettings;
sqlSettings.ClusterMapping = Clusters_;
sqlSettings.ModuleMapping = Modules_;
@@ -228,28 +251,51 @@ public:
yson.OnEndList();
}
- auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
- if (!maybeStr) {
- return std::nullopt;
- }
- return *maybeStr;
- };
+ TString progress;
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
+ ActiveQueriesProgress_.erase(queryId);
+ }
return {
.YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()),
.Plan = maybeToOptional(program->GetQueryPlan()),
.Statistics = maybeToOptional(program->GetStatistics()),
+ .Progress = progress,
.TaskInfo = maybeToOptional(program->GetTasksInfo()),
};
}
- TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept override
+ TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings) noexcept override
{
try {
- return GuardedRun(impersonationUser, queryText, settings);
+ return GuardedRun(queryId, impersonationUser, queryText, settings);
} catch (const std::exception& ex) {
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_.erase(queryId);
+ }
+
+ return TQueryResult{
+ .YsonError = MessageToYtErrorYson(ex.what()),
+ };
+ }
+ }
+
+ TQueryResult GetProgress(TQueryId queryId) noexcept override
+ {
+ auto guard = ReaderGuard(ProgressSpinLock);
+ if (ActiveQueriesProgress_.contains(queryId)) {
+ TQueryResult result;
+ if (ActiveQueriesProgress_[queryId].ProgressMerger.HasChangesSinceLastFlush()) {
+ result.Plan = ActiveQueriesProgress_[queryId].Plan;
+ result.Progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
+ }
+ return result;
+ } else {
return TQueryResult{
- .YsonError = ExceptionToYtErrorYson(ex),
+ .YsonError = MessageToYtErrorYson(Format("No progress for queryId: %v", queryId)),
};
}
}
@@ -266,6 +312,8 @@ private:
std::optional<TString> DefaultCluster_;
THashMap<TString, TString> Modules_;
TYsonString OperationAttributes_;
+ YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ProgressSpinLock);
+ THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_;
TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
{
diff --git a/yt/yql/plugin/native/progress_merger.cpp b/yt/yql/plugin/native/progress_merger.cpp
new file mode 100644
index 0000000000..3a0a02604e
--- /dev/null
+++ b/yt/yql/plugin/native/progress_merger.cpp
@@ -0,0 +1,116 @@
+#include "progress_merger.h"
+
+
+namespace NYT::NYqlPlugin {
+
+//////////////////////////////////////////////////////////////////////////////
+
+TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p)
+ : TNodeProgressBase(p)
+{}
+
+void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const
+{
+ writer.OnBeginMap();
+ {
+ writer.OnKeyedItem("category");
+ writer.OnStringScalar(Progress_.Category);
+
+ writer.OnKeyedItem("state");
+ writer.OnStringScalar(ToString(Progress_.State));
+
+ writer.OnKeyedItem("remoteId");
+ writer.OnStringScalar(Progress_.RemoteId);
+
+ writer.OnKeyedItem("stages");
+ writer.OnBeginMap();
+ for (size_t index = 0; index < Stages_.size(); index++) {
+ writer.OnKeyedItem(ToString(index));
+ writer.OnBeginMap();
+ {
+ writer.OnKeyedItem(Stages_[index].first);
+ writer.OnStringScalar(Stages_[index].second.ToString());
+ }
+ writer.OnEndMap();
+ }
+ writer.OnEndMap();
+
+ if (Progress_.Counters) {
+ writer.OnKeyedItem("completed");
+ writer.OnUint64Scalar(Progress_.Counters->Completed);
+
+ writer.OnKeyedItem("running");
+ writer.OnUint64Scalar(Progress_.Counters->Running);
+
+ writer.OnKeyedItem("total");
+ writer.OnUint64Scalar(Progress_.Counters->Total);
+
+ writer.OnKeyedItem("aborted");
+ writer.OnUint64Scalar(Progress_.Counters->Aborted);
+
+ writer.OnKeyedItem("failed");
+ writer.OnUint64Scalar(Progress_.Counters->Failed);
+
+ writer.OnKeyedItem("lost");
+ writer.OnUint64Scalar(Progress_.Counters->Lost);
+
+ writer.OnKeyedItem("pending");
+ writer.OnUint64Scalar(Progress_.Counters->Pending);
+ }
+
+ writer.OnKeyedItem("startedAt");
+ writer.OnStringScalar(StartedAt_.ToString());
+
+ if (FinishedAt_ != TInstant::Max()) {
+ writer.OnKeyedItem("finishedAt");
+ writer.OnStringScalar(FinishedAt_.ToString());
+ }
+ }
+ writer.OnEndMap();
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress)
+{
+ auto in = NodesMap_.emplace(progress.Id, progress);
+ if (!in.second) {
+ in.first->second.MergeWith(progress);
+ }
+ HasChanges_ = true;
+}
+
+void TProgressMerger::AbortAllUnfinishedNodes()
+{
+ for (auto& node: NodesMap_) {
+ if (node.second.IsUnfinished()) {
+ node.second.Abort();
+ HasChanges_ = true;
+ }
+ }
+}
+
+TString TProgressMerger::ToYsonString()
+{
+ TStringStream yson;
+ ::NYson::TYsonWriter writer(&yson);
+
+ writer.OnBeginMap();
+ for (auto& node: NodesMap_) {
+ writer.OnKeyedItem(ToString(node.first));
+ node.second.Serialize(writer);
+ }
+ writer.OnEndMap();
+ HasChanges_ = false;
+
+ return yson.Str();
+}
+
+bool TProgressMerger::HasChangesSinceLastFlush() const
+{
+ return HasChanges_;
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYqlPlugin
diff --git a/yt/yql/plugin/native/progress_merger.h b/yt/yql/plugin/native/progress_merger.h
new file mode 100644
index 0000000000..3bb5f101d0
--- /dev/null
+++ b/yt/yql/plugin/native/progress_merger.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <ydb/library/yql/core/yql_execution.h>
+#include <ydb/library/yql/core/progress_merger/progress_merger.h>
+
+#include <library/cpp/yson/writer.h>
+
+namespace NYT::NYqlPlugin {
+
+using namespace NYql::NProgressMerger;
+using namespace NYson;
+
+//////////////////////////////////////////////////////////////////////////////
+
+class TNodeProgress : public TNodeProgressBase {
+public:
+ TNodeProgress(const NYql::TOperationProgress& p);
+ void Serialize(::NYson::TYsonWriter& yson) const;
+};
+
+//////////////////////////////////////////////////////////////////////////////
+
+class TProgressMerger : public ITaskProgressMerger {
+public:
+ void MergeWith(const NYql::TOperationProgress& progress) override;
+ void AbortAllUnfinishedNodes() override;
+
+ bool HasChangesSinceLastFlush() const;
+ TString ToYsonString();
+
+private:
+ bool HasChanges_ = false;
+ THashMap<ui32, TNodeProgress> NodesMap_;
+};
+
+//////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYqlPlugin
diff --git a/yt/yql/plugin/native/ya.make b/yt/yql/plugin/native/ya.make
index 36d562a4d2..15f3851411 100644
--- a/yt/yql/plugin/native/ya.make
+++ b/yt/yql/plugin/native/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
GLOBAL plugin.cpp
error_helpers.cpp
+ progress_merger.cpp
)
PEERDIR(
@@ -19,6 +20,7 @@ PEERDIR(
ydb/library/yql/core/file_storage
ydb/library/yql/core/file_storage/proto
ydb/library/yql/core/file_storage/http_download
+ ydb/library/yql/core/progress_merger
ydb/library/yql/core/services/mounts
ydb/library/yql/core/user_data
ydb/library/yql/minikql
@@ -30,6 +32,7 @@ PEERDIR(
ydb/library/yql/providers/common/udf_resolve
ydb/library/yql/providers/solomon/gateway
ydb/library/yql/providers/solomon/provider
+ ydb/library/yql/core
ydb/library/yql/core/url_preprocessing
ydb/library/yql/providers/yt/gateway/native
ydb/library/yql/providers/yt/lib/log