aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql/plugin/native
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 16:44:26 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 16:59:54 +0300
commit69ca112108d4c388009f59ddb89cc814dca030c7 (patch)
tree2a6f07069a200e694334e279375ba0190725c157 /yt/yql/plugin/native
parentf882cdce886827d7e7f55e154329c10d2080960e (diff)
downloadydb-69ca112108d4c388009f59ddb89cc814dca030c7.tar.gz
Intermediate changes
Diffstat (limited to 'yt/yql/plugin/native')
-rw-r--r--yt/yql/plugin/native/error_helpers.h4
-rw-r--r--yt/yql/plugin/native/plugin.cpp62
-rw-r--r--yt/yql/plugin/native/plugin.h1
3 files changed, 61 insertions, 6 deletions
diff --git a/yt/yql/plugin/native/error_helpers.h b/yt/yql/plugin/native/error_helpers.h
index 4e57cdba40..ac75047cbe 100644
--- a/yt/yql/plugin/native/error_helpers.h
+++ b/yt/yql/plugin/native/error_helpers.h
@@ -1,9 +1,9 @@
#pragma once
-#include <util/generic/string.h>
-
#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <util/generic/string.h>
+
namespace NYT::NYqlPlugin {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp
index ce8a4a1694..60a0284aa7 100644
--- a/yt/yql/plugin/native/plugin.cpp
+++ b/yt/yql/plugin/native/plugin.cpp
@@ -8,7 +8,7 @@
#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
-#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
@@ -40,11 +40,15 @@
#include <library/cpp/yson/writer.h>
#include <library/cpp/digest/md5/md5.h>
+
#include <library/cpp/resource/resource.h>
#include <util/folder/path.h>
+
#include <util/stream/file.h>
+
#include <util/string/builder.h>
+
#include <util/system/fs.h>
namespace NYT::NYqlPlugin {
@@ -72,6 +76,9 @@ struct TQueryPlan
struct TActiveQuery
{
+ NYql::TProgramPtr Program;
+ bool Compiled = false;
+
TProgressMerger ProgressMerger;
std::optional<TString> Plan;
};
@@ -242,9 +249,19 @@ public:
YQL_LOG(INFO) << "YQL plugin initialized";
}
- TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files)
+ TQueryResult GuardedRun(
+ TQueryId queryId,
+ TString impersonationUser,
+ TString queryText,
+ TYsonString settings,
+ std::vector<TQueryFile> files)
{
auto program = ProgramFactory_->Create("-memory-", queryText);
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_[queryId].Program = program;
+ }
+
program->AddCredentials({{"impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)}});
program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
@@ -291,6 +308,11 @@ public:
};
}
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_[queryId].Compiled = true;
+ }
+
NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
status = program->RunWithConfig(impersonationUser, pipelineConfigurator);
@@ -327,7 +349,12 @@ public:
};
}
- TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override
+ TQueryResult Run(
+ TQueryId queryId,
+ TString impersonationUser,
+ TString queryText,
+ TYsonString settings,
+ std::vector<TQueryFile> files) noexcept override
{
try {
return GuardedRun(queryId, impersonationUser, queryText, settings, files);
@@ -360,6 +387,35 @@ public:
}
}
+ TAbortResult Abort(TQueryId queryId) noexcept override
+ {
+ NYql::TProgramPtr program;
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ if (!ActiveQueriesProgress_.contains(queryId)) {
+ return TAbortResult{
+ .YsonError = MessageToYtErrorYson(Format("Query %v is not found", queryId)),
+ };
+ }
+ if (!ActiveQueriesProgress_[queryId].Compiled) {
+ return TAbortResult{
+ .YsonError = MessageToYtErrorYson(Format("Query %v is not compiled", queryId)),
+ };
+ }
+ program = ActiveQueriesProgress_[queryId].Program;
+ }
+
+ try {
+ program->Abort().GetValueSync();
+ } catch (...) {
+ return TAbortResult{
+ .YsonError = MessageToYtErrorYson(Format("Failed to abort query %v: %v", queryId, CurrentExceptionMessage())),
+ };
+ }
+
+ return {};
+ }
+
private:
NYql::TFileStoragePtr FileStorage_;
NYql::TExprContext ExprContext_;
diff --git a/yt/yql/plugin/native/plugin.h b/yt/yql/plugin/native/plugin.h
index 2869a2d035..c930e56ba1 100644
--- a/yt/yql/plugin/native/plugin.h
+++ b/yt/yql/plugin/native/plugin.h
@@ -2,7 +2,6 @@
#include <yt/yql/plugin/plugin.h>
-
namespace NYT::NYqlPlugin {
////////////////////////////////////////////////////////////////////////////////