diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-09 16:44:26 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-09 16:59:54 +0300 |
commit | 69ca112108d4c388009f59ddb89cc814dca030c7 (patch) | |
tree | 2a6f07069a200e694334e279375ba0190725c157 | |
parent | f882cdce886827d7e7f55e154329c10d2080960e (diff) | |
download | ydb-69ca112108d4c388009f59ddb89cc814dca030c7.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yql/plugin/bridge/interface.h | 22 | ||||
-rw-r--r-- | yt/yql/plugin/bridge/plugin.cpp | 55 | ||||
-rw-r--r-- | yt/yql/plugin/dynamic/dylib.exports | 2 | ||||
-rw-r--r-- | yt/yql/plugin/dynamic/impl.cpp | 28 | ||||
-rw-r--r-- | yt/yql/plugin/native/error_helpers.h | 4 | ||||
-rw-r--r-- | yt/yql/plugin/native/plugin.cpp | 62 | ||||
-rw-r--r-- | yt/yql/plugin/native/plugin.h | 1 | ||||
-rw-r--r-- | yt/yql/plugin/plugin.h | 31 |
8 files changed, 174 insertions, 31 deletions
diff --git a/yt/yql/plugin/bridge/interface.h b/yt/yql/plugin/bridge/interface.h index 1c3a0cd291..f685e46688 100644 --- a/yt/yql/plugin/bridge/interface.h +++ b/yt/yql/plugin/bridge/interface.h @@ -82,10 +82,24 @@ struct TBridgeQueryFile EQueryFileContentType Type; }; +struct TBridgeAbortResult +{ + const char* YsonError = nullptr; + ssize_t YsonErrorLength = 0; +}; + using TFuncBridgeFreeQueryResult = void(TBridgeQueryResult* result); -using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* files, int fileCount); +using TFuncBridgeRun = TBridgeQueryResult*( + TBridgeYqlPlugin* plugin, + const char* queryId, + const char* impersonationUser, + const char* queryText, + const char* settings, + const TBridgeQueryFile* files, + int fileCount); using TFuncBridgeGetProgress = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId); -using TFuncBridgeAbort = void(TBridgeYqlPlugin* plugin, const char* queryId); +using TFuncBridgeAbort = TBridgeAbortResult*(TBridgeYqlPlugin* plugin, const char* queryId); +using TFuncBridgeFreeAbortResult = void(TBridgeAbortResult* result); //////////////////////////////////////////////////////////////////////////////// @@ -95,6 +109,8 @@ using TFuncBridgeAbort = void(TBridgeYqlPlugin* plugin, const char* queryId); XX(BridgeFreeQueryResult) \ XX(BridgeRun) \ XX(BridgeGetProgress) \ - XX(BridgeGetAbiVersion) + XX(BridgeGetAbiVersion) \ + XX(BridgeAbort) \ + XX(BridgeFreeAbortResult) //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yql/plugin/bridge/plugin.cpp b/yt/yql/plugin/bridge/plugin.cpp index 3549779474..95f7de7a76 100644 --- a/yt/yql/plugin/bridge/plugin.cpp +++ b/yt/yql/plugin/bridge/plugin.cpp @@ -58,6 +58,12 @@ public: } else { \ function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \ } \ + } else if constexpr(#function == TStringBuf("BridgeFreeAbortResult")) { \ + if (AbiVersion_ < EYqlPluginAbiVersion::AbortQuery) { \ + function = reinterpret_cast<TFunc ## function*>(FreeAbortResultStub); \ + } else { \ + function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \ + } \ } else { \ function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \ } \ @@ -69,8 +75,6 @@ public: GetYqlPluginAbiVersion(); FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX); - // COMPAT(gritukan): Remove after commit in YDB repository. - XX(BridgeAbort) #undef XX } @@ -81,15 +85,18 @@ protected: #define XX(function) TFunc ## function* function; FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX) - - // COMPAT(gritukan): Remove after commit in YDB repository. - XX(BridgeAbort) #undef XX // COMPAT(gritukan): AbortQuery - static void AbortQueryStub(TBridgeYqlPlugin* /*plugin*/, const char* /*queryId*/) + static TBridgeAbortResult* AbortQueryStub(TBridgeYqlPlugin* /*plugin*/, const char* /*queryId*/) { // Just do nothing. It is not worse than in used to be before. + return nullptr; + } + + static void FreeAbortResultStub(TBridgeAbortResult* /*result*/) + { + YT_ABORT(); } void GetYqlPluginAbiVersion() @@ -138,7 +145,12 @@ public: BridgePlugin_ = BridgeCreateYqlPlugin(&bridgeOptions); } - TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, NYson::TYsonString settings, std::vector<TQueryFile> files) noexcept override + TQueryResult Run( + TQueryId queryId, + TString impersonationUser, + TString queryText, + NYson::TYsonString settings, + std::vector<TQueryFile> files) noexcept override { auto settingsString = settings ? settings.ToString() : "{}"; auto queryIdStr = ToString(queryId); @@ -155,8 +167,15 @@ public: }); } - auto* bridgeQueryResult = BridgeRun(BridgePlugin_, queryIdStr.data(), impersonationUser.data(), queryText.data(), settingsString.data(), filesData.data(), filesData.size()); - TQueryResult queryResult = { + auto* bridgeQueryResult = BridgeRun( + BridgePlugin_, + queryIdStr.data(), + impersonationUser.data(), + queryText.data(), + settingsString.data(), + filesData.data(), + filesData.size()); + TQueryResult queryResult{ .YsonResult = ToString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength), .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength), .Statistics = ToString(bridgeQueryResult->Statistics, bridgeQueryResult->StatisticsLength), @@ -172,7 +191,7 @@ public: { auto queryIdStr = ToString(queryId); auto* bridgeQueryResult = BridgeGetProgress(BridgePlugin_, queryIdStr.data()); - TQueryResult queryResult = { + TQueryResult queryResult{ .Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength), .Progress = ToString(bridgeQueryResult->Progress, bridgeQueryResult->ProgressLength), }; @@ -180,6 +199,22 @@ public: return queryResult; } + TAbortResult Abort(TQueryId queryId) noexcept override + { + auto queryIdStr = ToString(queryId); + auto* bridgeAbortResult = BridgeAbort(BridgePlugin_, queryIdStr.data()); + // COMPAT(gritukan): AbortQuery + if (!bridgeAbortResult) { + return {}; + } + + TAbortResult abortResult{ + .YsonError = ToString(bridgeAbortResult->YsonError, bridgeAbortResult->YsonErrorLength), + }; + BridgeFreeAbortResult(bridgeAbortResult); + return abortResult; + } + ~TYqlPlugin() override { BridgeFreeYqlPlugin(BridgePlugin_); diff --git a/yt/yql/plugin/dynamic/dylib.exports b/yt/yql/plugin/dynamic/dylib.exports index e4dd9fc44e..743e71c650 100644 --- a/yt/yql/plugin/dynamic/dylib.exports +++ b/yt/yql/plugin/dynamic/dylib.exports @@ -5,6 +5,8 @@ BridgeFreeQueryResult BridgeRun BridgeGetProgress BridgeGetAbiVersion +BridgeAbort +BridgeFreeAbortResult # YQL <-> YQL UDFs interface. UdfAllocateWithSize diff --git a/yt/yql/plugin/dynamic/impl.cpp b/yt/yql/plugin/dynamic/impl.cpp index aba5a5bc66..c63f9fb035 100644 --- a/yt/yql/plugin/dynamic/impl.cpp +++ b/yt/yql/plugin/dynamic/impl.cpp @@ -12,7 +12,7 @@ extern "C" { ssize_t BridgeGetAbiVersion() { - return 0; + return 1; // EYqlPluginAbiVersion::AbortQuery } TBridgeYqlPlugin* BridgeCreateYqlPlugin(const TBridgeYqlPluginOptions* bridgeOptions) @@ -68,7 +68,14 @@ void FillString(const char*& str, ssize_t& strLength, const std::optional<TStrin strLength = original->size(); } -TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* bridgeFiles, int bridgeFileCount) +TBridgeQueryResult* BridgeRun( + TBridgeYqlPlugin* plugin, + const char* queryId, + const char* impersonationUser, + const char* queryText, + const char* settings, + const TBridgeQueryFile* bridgeFiles, + int bridgeFileCount) { static const auto EmptyMap = TYsonString(TString("{}")); @@ -113,6 +120,23 @@ TBridgeQueryResult* BridgeGetProgress(TBridgeYqlPlugin* plugin, const char* quer return bridgeResult; } +TBridgeAbortResult* BridgeAbort(TBridgeYqlPlugin* plugin, const char* queryId) +{ + auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin); + auto* bridgeResult = new TBridgeAbortResult; + + auto result = nativePlugin->GetProgress(NYT::TGuid::FromString(queryId)); + FillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError); + + return bridgeResult; +} + +void BridgeFreeAbortResult(TBridgeAbortResult* result) +{ + delete result->YsonError; + delete result; +} + //////////////////////////////////////////////////////////////////////////////// // Validate that the all functions from the bridge interface are implemented with proper signatures. diff --git a/yt/yql/plugin/native/error_helpers.h b/yt/yql/plugin/native/error_helpers.h index 4e57cdba40..ac75047cbe 100644 --- a/yt/yql/plugin/native/error_helpers.h +++ b/yt/yql/plugin/native/error_helpers.h @@ -1,9 +1,9 @@ #pragma once -#include <util/generic/string.h> - #include <ydb/library/yql/public/issue/yql_issue.h> +#include <util/generic/string.h> + namespace NYT::NYqlPlugin { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp index ce8a4a1694..60a0284aa7 100644 --- a/yt/yql/plugin/native/plugin.cpp +++ b/yt/yql/plugin/native/plugin.cpp @@ -8,7 +8,7 @@ #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h> #include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> -#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h" +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> @@ -40,11 +40,15 @@ #include <library/cpp/yson/writer.h> #include <library/cpp/digest/md5/md5.h> + #include <library/cpp/resource/resource.h> #include <util/folder/path.h> + #include <util/stream/file.h> + #include <util/string/builder.h> + #include <util/system/fs.h> namespace NYT::NYqlPlugin { @@ -72,6 +76,9 @@ struct TQueryPlan struct TActiveQuery { + NYql::TProgramPtr Program; + bool Compiled = false; + TProgressMerger ProgressMerger; std::optional<TString> Plan; }; @@ -242,9 +249,19 @@ public: YQL_LOG(INFO) << "YQL plugin initialized"; } - TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) + TQueryResult GuardedRun( + TQueryId queryId, + TString impersonationUser, + TString queryText, + TYsonString settings, + std::vector<TQueryFile> files) { auto program = ProgramFactory_->Create("-memory-", queryText); + { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_[queryId].Program = program; + } + program->AddCredentials({{"impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)}}); program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings)); @@ -291,6 +308,11 @@ public: }; } + { + auto guard = WriterGuard(ProgressSpinLock); + ActiveQueriesProgress_[queryId].Compiled = true; + } + NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error; status = program->RunWithConfig(impersonationUser, pipelineConfigurator); @@ -327,7 +349,12 @@ public: }; } - TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override + TQueryResult Run( + TQueryId queryId, + TString impersonationUser, + TString queryText, + TYsonString settings, + std::vector<TQueryFile> files) noexcept override { try { return GuardedRun(queryId, impersonationUser, queryText, settings, files); @@ -360,6 +387,35 @@ public: } } + TAbortResult Abort(TQueryId queryId) noexcept override + { + NYql::TProgramPtr program; + { + auto guard = WriterGuard(ProgressSpinLock); + if (!ActiveQueriesProgress_.contains(queryId)) { + return TAbortResult{ + .YsonError = MessageToYtErrorYson(Format("Query %v is not found", queryId)), + }; + } + if (!ActiveQueriesProgress_[queryId].Compiled) { + return TAbortResult{ + .YsonError = MessageToYtErrorYson(Format("Query %v is not compiled", queryId)), + }; + } + program = ActiveQueriesProgress_[queryId].Program; + } + + try { + program->Abort().GetValueSync(); + } catch (...) { + return TAbortResult{ + .YsonError = MessageToYtErrorYson(Format("Failed to abort query %v: %v", queryId, CurrentExceptionMessage())), + }; + } + + return {}; + } + private: NYql::TFileStoragePtr FileStorage_; NYql::TExprContext ExprContext_; diff --git a/yt/yql/plugin/native/plugin.h b/yt/yql/plugin/native/plugin.h index 2869a2d035..c930e56ba1 100644 --- a/yt/yql/plugin/native/plugin.h +++ b/yt/yql/plugin/native/plugin.h @@ -2,7 +2,6 @@ #include <yt/yql/plugin/plugin.h> - namespace NYT::NYqlPlugin { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yql/plugin/plugin.h b/yt/yql/plugin/plugin.h index 9c9cc50798..3fcc8e3e16 100644 --- a/yt/yql/plugin/plugin.h +++ b/yt/yql/plugin/plugin.h @@ -2,21 +2,19 @@ #include <yt/yql/plugin/bridge/interface.h> -#include <util/generic/hash.h> -#include <util/generic/string.h> - #include <library/cpp/logger/log.h> #include <library/cpp/yt/string/guid.h> #include <library/cpp/yt/yson_string/string.h> +#include <util/generic/hash.h> +#include <util/generic/string.h> + #include <optional> namespace NYT::NYqlPlugin { -using namespace NYson; - //////////////////////////////////////////////////////////////////////////////// using TQueryId = TGuid; @@ -26,10 +24,10 @@ using TQueryId = TGuid; class TYqlPluginOptions { public: - TYsonString SingletonsConfig; - TYsonString GatewayConfig; - TYsonString FileStorageConfig; - TYsonString OperationAttributes; + NYson::TYsonString SingletonsConfig; + NYson::TYsonString GatewayConfig; + NYson::TYsonString FileStorageConfig; + NYson::TYsonString OperationAttributes; TString YTTokenPath; @@ -57,6 +55,12 @@ struct TQueryFile EQueryFileContentType Type; }; +struct TAbortResult +{ + //! YSON representation of a YT error. + std::optional<TString> YsonError; +}; + //! This interface encapsulates YT <-> YQL integration. //! There are two major implementation: one of them is based //! on YQL code and another wraps the pure C bridge interface, which @@ -66,9 +70,16 @@ struct TQueryFile */ struct IYqlPlugin { - virtual TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept = 0; + virtual TQueryResult Run( + TQueryId queryId, + TString impersonationUser, + TString queryText, + NYson::TYsonString settings, + std::vector<TQueryFile> files) noexcept = 0; virtual TQueryResult GetProgress(TQueryId queryId) noexcept = 0; + virtual TAbortResult Abort(TQueryId queryId) noexcept = 0; + virtual ~IYqlPlugin() = default; }; |