diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-09 16:44:26 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-09 16:59:54 +0300 |
commit | 69ca112108d4c388009f59ddb89cc814dca030c7 (patch) | |
tree | 2a6f07069a200e694334e279375ba0190725c157 /yt/yql/plugin/native/plugin.cpp | |
parent | f882cdce886827d7e7f55e154329c10d2080960e (diff) | |
download | ydb-69ca112108d4c388009f59ddb89cc814dca030c7.tar.gz |
Intermediate changes
Diffstat (limited to 'yt/yql/plugin/native/plugin.cpp')
-rw-r--r-- | yt/yql/plugin/native/plugin.cpp | 62 |
1 files changed, 59 insertions, 3 deletions
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp index ce8a4a1694..60a0284aa7 100644 --- a/yt/yql/plugin/native/plugin.cpp +++ b/yt/yql/plugin/native/plugin.cpp @@ -8,7 +8,7 @@ #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h> #include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> -#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h" +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> @@ -40,11 +40,15 @@ #include <library/cpp/yson/writer.h> #include <library/cpp/digest/md5/md5.h> + #include <library/cpp/resource/resource.h> #include <util/folder/path.h> + #include <util/stream/file.h> + #include <util/string/builder.h> + #include <util/system/fs.h> namespace NYT::NYqlPlugin { @@ -72,6 +76,9 @@ struct TQueryPlan struct TActiveQuery { + NYql::TProgramPtr Program; + bool Compiled = false; + TProgressMerger ProgressMerger; std::optional<TString> Plan; }; @@ -242,9 +249,19 @@ public: YQL_LOG(INFO) << "YQL plugin initialized"; } - TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) + TQueryResult GuardedRun( + TQueryId queryId, + TString impersonationUser, + TString queryText, + TYsonString settings, + std::vector<TQueryFile> files) { auto program = ProgramFactory_->Create("-memory-", queryText); + { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_[queryId].Program = program; + } + program->AddCredentials({{"impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)}}); program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings)); @@ -291,6 +308,11 @@ public: }; } + { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_[queryId].Compiled = true; + } + NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error; status = program->RunWithConfig(impersonationUser, pipelineConfigurator); @@ -327,7 +349,12 @@ public: }; } - TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override + TQueryResult Run( + TQueryId queryId, + TString impersonationUser, + TString queryText, + TYsonString settings, + std::vector<TQueryFile> files) noexcept override { try { return GuardedRun(queryId, impersonationUser, queryText, settings, files); @@ -360,6 +387,35 @@ public: } } + TAbortResult Abort(TQueryId queryId) noexcept override + { + NYql::TProgramPtr program; + { + auto guard = WriterGuard(ProgressSpinLock); + if (!ActiveQueriesProgress_.contains(queryId)) { + return TAbortResult{ + .YsonError = MessageToYtErrorYson(Format("Query %v is not found", queryId)), + }; + } + if (!ActiveQueriesProgress_[queryId].Compiled) { + return TAbortResult{ + .YsonError = MessageToYtErrorYson(Format("Query %v is not compiled", queryId)), + }; + } + program = ActiveQueriesProgress_[queryId].Program; + } + + try { + program->Abort().GetValueSync(); + } catch (...) { + return TAbortResult{ + .YsonError = MessageToYtErrorYson(Format("Failed to abort query %v: %v", queryId, CurrentExceptionMessage())), + }; + } + + return {}; + } + private: NYql::TFileStoragePtr FileStorage_; NYql::TExprContext ExprContext_; |