aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql/plugin
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 16:44:26 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-09 16:59:54 +0300
commit69ca112108d4c388009f59ddb89cc814dca030c7 (patch)
tree2a6f07069a200e694334e279375ba0190725c157 /yt/yql/plugin
parentf882cdce886827d7e7f55e154329c10d2080960e (diff)
downloadydb-69ca112108d4c388009f59ddb89cc814dca030c7.tar.gz
Intermediate changes
Diffstat (limited to 'yt/yql/plugin')
-rw-r--r--yt/yql/plugin/bridge/interface.h22
-rw-r--r--yt/yql/plugin/bridge/plugin.cpp55
-rw-r--r--yt/yql/plugin/dynamic/dylib.exports2
-rw-r--r--yt/yql/plugin/dynamic/impl.cpp28
-rw-r--r--yt/yql/plugin/native/error_helpers.h4
-rw-r--r--yt/yql/plugin/native/plugin.cpp62
-rw-r--r--yt/yql/plugin/native/plugin.h1
-rw-r--r--yt/yql/plugin/plugin.h31
8 files changed, 174 insertions, 31 deletions
diff --git a/yt/yql/plugin/bridge/interface.h b/yt/yql/plugin/bridge/interface.h
index 1c3a0cd291..f685e46688 100644
--- a/yt/yql/plugin/bridge/interface.h
+++ b/yt/yql/plugin/bridge/interface.h
@@ -82,10 +82,24 @@ struct TBridgeQueryFile
EQueryFileContentType Type;
};
+struct TBridgeAbortResult
+{
+ const char* YsonError = nullptr;
+ ssize_t YsonErrorLength = 0;
+};
+
using TFuncBridgeFreeQueryResult = void(TBridgeQueryResult* result);
-using TFuncBridgeRun = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* files, int fileCount);
+using TFuncBridgeRun = TBridgeQueryResult*(
+ TBridgeYqlPlugin* plugin,
+ const char* queryId,
+ const char* impersonationUser,
+ const char* queryText,
+ const char* settings,
+ const TBridgeQueryFile* files,
+ int fileCount);
using TFuncBridgeGetProgress = TBridgeQueryResult*(TBridgeYqlPlugin* plugin, const char* queryId);
-using TFuncBridgeAbort = void(TBridgeYqlPlugin* plugin, const char* queryId);
+using TFuncBridgeAbort = TBridgeAbortResult*(TBridgeYqlPlugin* plugin, const char* queryId);
+using TFuncBridgeFreeAbortResult = void(TBridgeAbortResult* result);
////////////////////////////////////////////////////////////////////////////////
@@ -95,6 +109,8 @@ using TFuncBridgeAbort = void(TBridgeYqlPlugin* plugin, const char* queryId);
XX(BridgeFreeQueryResult) \
XX(BridgeRun) \
XX(BridgeGetProgress) \
- XX(BridgeGetAbiVersion)
+ XX(BridgeGetAbiVersion) \
+ XX(BridgeAbort) \
+ XX(BridgeFreeAbortResult)
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/plugin/bridge/plugin.cpp b/yt/yql/plugin/bridge/plugin.cpp
index 3549779474..95f7de7a76 100644
--- a/yt/yql/plugin/bridge/plugin.cpp
+++ b/yt/yql/plugin/bridge/plugin.cpp
@@ -58,6 +58,12 @@ public:
} else { \
function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \
} \
+ } else if constexpr(#function == TStringBuf("BridgeFreeAbortResult")) { \
+ if (AbiVersion_ < EYqlPluginAbiVersion::AbortQuery) { \
+ function = reinterpret_cast<TFunc ## function*>(FreeAbortResultStub); \
+ } else { \
+ function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \
+ } \
} else { \
function = reinterpret_cast<TFunc ## function*>(Library_.Sym(#function)); \
} \
@@ -69,8 +75,6 @@ public:
GetYqlPluginAbiVersion();
FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX);
- // COMPAT(gritukan): Remove after commit in YDB repository.
- XX(BridgeAbort)
#undef XX
}
@@ -81,15 +85,18 @@ protected:
#define XX(function) TFunc ## function* function;
FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX)
-
- // COMPAT(gritukan): Remove after commit in YDB repository.
- XX(BridgeAbort)
#undef XX
// COMPAT(gritukan): AbortQuery
- static void AbortQueryStub(TBridgeYqlPlugin* /*plugin*/, const char* /*queryId*/)
+ static TBridgeAbortResult* AbortQueryStub(TBridgeYqlPlugin* /*plugin*/, const char* /*queryId*/)
{
// Just do nothing. It is not worse than in used to be before.
+ return nullptr;
+ }
+
+ static void FreeAbortResultStub(TBridgeAbortResult* /*result*/)
+ {
+ YT_ABORT();
}
void GetYqlPluginAbiVersion()
@@ -138,7 +145,12 @@ public:
BridgePlugin_ = BridgeCreateYqlPlugin(&bridgeOptions);
}
- TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, NYson::TYsonString settings, std::vector<TQueryFile> files) noexcept override
+ TQueryResult Run(
+ TQueryId queryId,
+ TString impersonationUser,
+ TString queryText,
+ NYson::TYsonString settings,
+ std::vector<TQueryFile> files) noexcept override
{
auto settingsString = settings ? settings.ToString() : "{}";
auto queryIdStr = ToString(queryId);
@@ -155,8 +167,15 @@ public:
});
}
- auto* bridgeQueryResult = BridgeRun(BridgePlugin_, queryIdStr.data(), impersonationUser.data(), queryText.data(), settingsString.data(), filesData.data(), filesData.size());
- TQueryResult queryResult = {
+ auto* bridgeQueryResult = BridgeRun(
+ BridgePlugin_,
+ queryIdStr.data(),
+ impersonationUser.data(),
+ queryText.data(),
+ settingsString.data(),
+ filesData.data(),
+ filesData.size());
+ TQueryResult queryResult{
.YsonResult = ToString(bridgeQueryResult->YsonResult, bridgeQueryResult->YsonResultLength),
.Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),
.Statistics = ToString(bridgeQueryResult->Statistics, bridgeQueryResult->StatisticsLength),
@@ -172,7 +191,7 @@ public:
{
auto queryIdStr = ToString(queryId);
auto* bridgeQueryResult = BridgeGetProgress(BridgePlugin_, queryIdStr.data());
- TQueryResult queryResult = {
+ TQueryResult queryResult{
.Plan = ToString(bridgeQueryResult->Plan, bridgeQueryResult->PlanLength),
.Progress = ToString(bridgeQueryResult->Progress, bridgeQueryResult->ProgressLength),
};
@@ -180,6 +199,22 @@ public:
return queryResult;
}
+ TAbortResult Abort(TQueryId queryId) noexcept override
+ {
+ auto queryIdStr = ToString(queryId);
+ auto* bridgeAbortResult = BridgeAbort(BridgePlugin_, queryIdStr.data());
+ // COMPAT(gritukan): AbortQuery
+ if (!bridgeAbortResult) {
+ return {};
+ }
+
+ TAbortResult abortResult{
+ .YsonError = ToString(bridgeAbortResult->YsonError, bridgeAbortResult->YsonErrorLength),
+ };
+ BridgeFreeAbortResult(bridgeAbortResult);
+ return abortResult;
+ }
+
~TYqlPlugin() override
{
BridgeFreeYqlPlugin(BridgePlugin_);
diff --git a/yt/yql/plugin/dynamic/dylib.exports b/yt/yql/plugin/dynamic/dylib.exports
index e4dd9fc44e..743e71c650 100644
--- a/yt/yql/plugin/dynamic/dylib.exports
+++ b/yt/yql/plugin/dynamic/dylib.exports
@@ -5,6 +5,8 @@ BridgeFreeQueryResult
BridgeRun
BridgeGetProgress
BridgeGetAbiVersion
+BridgeAbort
+BridgeFreeAbortResult
# YQL <-> YQL UDFs interface.
UdfAllocateWithSize
diff --git a/yt/yql/plugin/dynamic/impl.cpp b/yt/yql/plugin/dynamic/impl.cpp
index aba5a5bc66..c63f9fb035 100644
--- a/yt/yql/plugin/dynamic/impl.cpp
+++ b/yt/yql/plugin/dynamic/impl.cpp
@@ -12,7 +12,7 @@ extern "C" {
ssize_t BridgeGetAbiVersion()
{
- return 0;
+ return 1; // EYqlPluginAbiVersion::AbortQuery
}
TBridgeYqlPlugin* BridgeCreateYqlPlugin(const TBridgeYqlPluginOptions* bridgeOptions)
@@ -68,7 +68,14 @@ void FillString(const char*& str, ssize_t& strLength, const std::optional<TStrin
strLength = original->size();
}
-TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* bridgeFiles, int bridgeFileCount)
+TBridgeQueryResult* BridgeRun(
+ TBridgeYqlPlugin* plugin,
+ const char* queryId,
+ const char* impersonationUser,
+ const char* queryText,
+ const char* settings,
+ const TBridgeQueryFile* bridgeFiles,
+ int bridgeFileCount)
{
static const auto EmptyMap = TYsonString(TString("{}"));
@@ -113,6 +120,23 @@ TBridgeQueryResult* BridgeGetProgress(TBridgeYqlPlugin* plugin, const char* quer
return bridgeResult;
}
+TBridgeAbortResult* BridgeAbort(TBridgeYqlPlugin* plugin, const char* queryId)
+{
+ auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
+ auto* bridgeResult = new TBridgeAbortResult;
+
+ auto result = nativePlugin->GetProgress(NYT::TGuid::FromString(queryId));
+ FillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError);
+
+ return bridgeResult;
+}
+
+void BridgeFreeAbortResult(TBridgeAbortResult* result)
+{
+ delete result->YsonError;
+ delete result;
+}
+
////////////////////////////////////////////////////////////////////////////////
// Validate that the all functions from the bridge interface are implemented with proper signatures.
diff --git a/yt/yql/plugin/native/error_helpers.h b/yt/yql/plugin/native/error_helpers.h
index 4e57cdba40..ac75047cbe 100644
--- a/yt/yql/plugin/native/error_helpers.h
+++ b/yt/yql/plugin/native/error_helpers.h
@@ -1,9 +1,9 @@
#pragma once
-#include <util/generic/string.h>
-
#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <util/generic/string.h>
+
namespace NYT::NYqlPlugin {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp
index ce8a4a1694..60a0284aa7 100644
--- a/yt/yql/plugin/native/plugin.cpp
+++ b/yt/yql/plugin/native/plugin.cpp
@@ -8,7 +8,7 @@
#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
-#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
@@ -40,11 +40,15 @@
#include <library/cpp/yson/writer.h>
#include <library/cpp/digest/md5/md5.h>
+
#include <library/cpp/resource/resource.h>
#include <util/folder/path.h>
+
#include <util/stream/file.h>
+
#include <util/string/builder.h>
+
#include <util/system/fs.h>
namespace NYT::NYqlPlugin {
@@ -72,6 +76,9 @@ struct TQueryPlan
struct TActiveQuery
{
+ NYql::TProgramPtr Program;
+ bool Compiled = false;
+
TProgressMerger ProgressMerger;
std::optional<TString> Plan;
};
@@ -242,9 +249,19 @@ public:
YQL_LOG(INFO) << "YQL plugin initialized";
}
- TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files)
+ TQueryResult GuardedRun(
+ TQueryId queryId,
+ TString impersonationUser,
+ TString queryText,
+ TYsonString settings,
+ std::vector<TQueryFile> files)
{
auto program = ProgramFactory_->Create("-memory-", queryText);
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_[queryId].Program = program;
+ }
+
program->AddCredentials({{"impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)}});
program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
@@ -291,6 +308,11 @@ public:
};
}
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ ActiveQueriesProgress_[queryId].Compiled = true;
+ }
+
NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
status = program->RunWithConfig(impersonationUser, pipelineConfigurator);
@@ -327,7 +349,12 @@ public:
};
}
- TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override
+ TQueryResult Run(
+ TQueryId queryId,
+ TString impersonationUser,
+ TString queryText,
+ TYsonString settings,
+ std::vector<TQueryFile> files) noexcept override
{
try {
return GuardedRun(queryId, impersonationUser, queryText, settings, files);
@@ -360,6 +387,35 @@ public:
}
}
+ TAbortResult Abort(TQueryId queryId) noexcept override
+ {
+ NYql::TProgramPtr program;
+ {
+ auto guard = WriterGuard(ProgressSpinLock);
+ if (!ActiveQueriesProgress_.contains(queryId)) {
+ return TAbortResult{
+ .YsonError = MessageToYtErrorYson(Format("Query %v is not found", queryId)),
+ };
+ }
+ if (!ActiveQueriesProgress_[queryId].Compiled) {
+ return TAbortResult{
+ .YsonError = MessageToYtErrorYson(Format("Query %v is not compiled", queryId)),
+ };
+ }
+ program = ActiveQueriesProgress_[queryId].Program;
+ }
+
+ try {
+ program->Abort().GetValueSync();
+ } catch (...) {
+ return TAbortResult{
+ .YsonError = MessageToYtErrorYson(Format("Failed to abort query %v: %v", queryId, CurrentExceptionMessage())),
+ };
+ }
+
+ return {};
+ }
+
private:
NYql::TFileStoragePtr FileStorage_;
NYql::TExprContext ExprContext_;
diff --git a/yt/yql/plugin/native/plugin.h b/yt/yql/plugin/native/plugin.h
index 2869a2d035..c930e56ba1 100644
--- a/yt/yql/plugin/native/plugin.h
+++ b/yt/yql/plugin/native/plugin.h
@@ -2,7 +2,6 @@
#include <yt/yql/plugin/plugin.h>
-
namespace NYT::NYqlPlugin {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yql/plugin/plugin.h b/yt/yql/plugin/plugin.h
index 9c9cc50798..3fcc8e3e16 100644
--- a/yt/yql/plugin/plugin.h
+++ b/yt/yql/plugin/plugin.h
@@ -2,21 +2,19 @@
#include <yt/yql/plugin/bridge/interface.h>
-#include <util/generic/hash.h>
-#include <util/generic/string.h>
-
#include <library/cpp/logger/log.h>
#include <library/cpp/yt/string/guid.h>
#include <library/cpp/yt/yson_string/string.h>
+#include <util/generic/hash.h>
+#include <util/generic/string.h>
+
#include <optional>
namespace NYT::NYqlPlugin {
-using namespace NYson;
-
////////////////////////////////////////////////////////////////////////////////
using TQueryId = TGuid;
@@ -26,10 +24,10 @@ using TQueryId = TGuid;
class TYqlPluginOptions
{
public:
- TYsonString SingletonsConfig;
- TYsonString GatewayConfig;
- TYsonString FileStorageConfig;
- TYsonString OperationAttributes;
+ NYson::TYsonString SingletonsConfig;
+ NYson::TYsonString GatewayConfig;
+ NYson::TYsonString FileStorageConfig;
+ NYson::TYsonString OperationAttributes;
TString YTTokenPath;
@@ -57,6 +55,12 @@ struct TQueryFile
EQueryFileContentType Type;
};
+struct TAbortResult
+{
+ //! YSON representation of a YT error.
+ std::optional<TString> YsonError;
+};
+
//! This interface encapsulates YT <-> YQL integration.
//! There are two major implementation: one of them is based
//! on YQL code and another wraps the pure C bridge interface, which
@@ -66,9 +70,16 @@ struct TQueryFile
*/
struct IYqlPlugin
{
- virtual TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept = 0;
+ virtual TQueryResult Run(
+ TQueryId queryId,
+ TString impersonationUser,
+ TString queryText,
+ NYson::TYsonString settings,
+ std::vector<TQueryFile> files) noexcept = 0;
virtual TQueryResult GetProgress(TQueryId queryId) noexcept = 0;
+ virtual TAbortResult Abort(TQueryId queryId) noexcept = 0;
+
virtual ~IYqlPlugin() = default;
};