aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql/plugin/native/plugin.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/yql/plugin/native/plugin.cpp
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/yql/plugin/native/plugin.cpp')
-rw-r--r--yt/yql/plugin/native/plugin.cpp293
1 files changed, 293 insertions, 0 deletions
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp
new file mode 100644
index 00000000000..086f63a2f9c
--- /dev/null
+++ b/yt/yql/plugin/native/plugin.cpp
@@ -0,0 +1,293 @@
+#include "plugin.h"
+
+#include "error_helpers.h"
+
+#include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
+#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
+
+#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
+
+#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
+#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/core/facade/yql_facade.h>
+#include <ydb/library/yql/core/file_storage/file_storage.h>
+#include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h"
+#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/logging/logger.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/yson/parser.h>
+#include <library/cpp/yson/writer.h>
+
+#include <library/cpp/resource/resource.h>
+#include <library/cpp/digest/md5/md5.h>
+
+#include <util/folder/path.h>
+#include <util/stream/file.h>
+#include <util/string/builder.h>
+#include <util/system/fs.h>
+#include <util/system/user.h>
+
+namespace NYT::NYqlPlugin {
+namespace NNative {
+
+using namespace NYson;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TYqlPlugin
+ : public IYqlPlugin
+{
+public:
+ TYqlPlugin(TYqlPluginOptions& options)
+ {
+ try {
+ NYql::NLog::InitLogger(std::move(options.LogBackend));
+
+ auto& logger = NYql::NLog::YqlLogger();
+
+ logger.SetDefaultPriority(ELogPriority::TLOG_DEBUG);
+ for (int i = 0; i < NYql::NLog::EComponentHelpers::ToInt(NYql::NLog::EComponent::MaxValue); ++i) {
+ logger.SetComponentLevel((NYql::NLog::EComponent) i, NYql::NLog::ELevel::DEBUG);
+ }
+
+ NYql::SetYtLoggerGlobalBackend(NYT::ILogger::ELevel::DEBUG);
+ if (NYT::TConfig::Get()->Prefix.empty()) {
+ NYT::TConfig::Get()->Prefix = "//";
+ }
+
+ auto yqlCoreFlags = GatewaysConfig_.GetYqlCore()
+ .GetFlags();
+
+ auto ytConfig = GatewaysConfig_.MutableYt();
+ if (!ytConfig->HasExecuteUdfLocallyIfPossible()) {
+ ytConfig->SetExecuteUdfLocallyIfPossible(true);
+ }
+
+ ytConfig->SetYtLogLevel(NYql::EYtLogLevel::YL_DEBUG);
+ ytConfig->SetMrJobBin(options.MRJobBinary);
+ ytConfig->SetMrJobBinMd5(MD5::File(options.MRJobBinary));
+
+ ytConfig->ClearMrJobUdfsDir();
+
+ for (const auto& [cluster, address]: options.Clusters) {
+ auto item = ytConfig->AddClusterMapping();
+ item->SetName(cluster);
+ item->SetCluster(address);
+ if (cluster == options.DefaultCluster) {
+ item->SetDefault(true);
+ }
+
+ Clusters_.insert({item->GetName(), TString(NYql::YtProviderName)});
+ }
+ DefaultCluster_ = options.DefaultCluster;
+
+ NYql::TFileStorageConfig fileStorageConfig;
+ fileStorageConfig.SetMaxSizeMb(1 << 14);
+ FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)}));
+
+ FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
+ NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone();
+
+ const NKikimr::NMiniKQL::TUdfModuleRemappings emptyRemappings;
+
+ FuncRegistry_->SetBackTraceCallback(&NYql::NBacktrace::KikimrBackTrace);
+
+ NKikimr::NMiniKQL::TUdfModulePathsMap systemModules;
+
+ TVector<TString> udfPaths;
+ NKikimr::NMiniKQL::FindUdfsInDir(options.UdfDirectory, &udfPaths);
+ for (const auto& path: udfPaths) {
+ // Skip YQL plugin shared library itself, it is not a UDF.
+ if (path.EndsWith("libyqlplugin.so")) {
+ continue;
+ }
+ FuncRegistry_->LoadUdfs(path, emptyRemappings, 0);
+ }
+
+ for (auto& m: FuncRegistry_->GetAllModuleNames()) {
+ TMaybe<TString> path = FuncRegistry_->FindUdfPath(m);
+ if (!path) {
+ YQL_LOG(FATAL) << "Unable to detect UDF path for module " << m;
+ exit(1);
+ }
+ systemModules.emplace(m, *path);
+ }
+
+ FuncRegistry_->SetSystemModulePaths(systemModules);
+
+ NYql::TUserDataTable userDataTable = GetYqlModuleResolver(ExprContext_, ModuleResolver_, {}, Clusters_, {});
+
+ if (!userDataTable) {
+ TStringStream err;
+ ExprContext_.IssueManager
+ .GetIssues()
+ .PrintTo(err);
+ YQL_LOG(FATAL) << "Failed to compile modules:\n"
+ << err.Str();
+ exit(1);
+ }
+
+ OperationAttributes_ = options.OperationAttributes;
+
+ TVector<NYql::TDataProviderInitializer> dataProvidersInit;
+
+ NYql::TYtNativeServices ytServices;
+ ytServices.FunctionRegistry = FuncRegistry_.Get();
+ ytServices.FileStorage = FileStorage_;
+ ytServices.Config = std::make_shared<NYql::TYtGatewayConfig>(*ytConfig);
+ auto ytNativeGateway = CreateYtNativeGateway(ytServices);
+ dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
+
+ ProgramFactory_ = std::make_unique<NYql::TProgramFactory>(
+ false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded");
+ YTTokenPath_ = options.YTTokenPath;
+ ProgramFactory_->AddUserDataTable(userDataTable);
+ ProgramFactory_->SetModules(ModuleResolver_);
+ ProgramFactory_->SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_));
+ ProgramFactory_->SetGatewaysConfig(&GatewaysConfig_);
+ ProgramFactory_->SetFileStorage(FileStorage_);
+ ProgramFactory_->SetUrlPreprocessing(MakeIntrusive<NYql::TUrlPreprocessing>(GatewaysConfig_));
+ } catch (const std::exception& ex) {
+ YQL_LOG(FATAL) << "Unexpected exception while initializing YQL plugin: " << ex.what();
+ exit(1);
+ }
+ YQL_LOG(INFO) << "YQL plugin initialized";
+ }
+
+ TQueryResult GuardedRun(TString impersonationUser, TString queryText, TYsonString settings)
+ {
+ auto credentials = MakeIntrusive<NYql::TCredentials>();
+ if (YTTokenPath_) {
+ TFsPath path(YTTokenPath_);
+ auto token = TIFStream(path).ReadAll();
+
+ credentials->AddCredential("default_yt", NYql::TCredential("yt", "", token));
+ }
+
+ credentials->AddCredential("impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser));
+ ProgramFactory_->SetCredentials(credentials);
+
+ auto program = ProgramFactory_->Create("-memory-", queryText);
+ program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
+
+ NSQLTranslation::TTranslationSettings sqlSettings;
+ sqlSettings.ClusterMapping = Clusters_;
+ sqlSettings.ModuleMapping = Modules_;
+ if (DefaultCluster_) {
+ sqlSettings.DefaultCluster = *DefaultCluster_;
+ }
+ sqlSettings.SyntaxVersion = 1;
+ sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
+
+ if (!program->ParseSql(sqlSettings)) {
+ return TQueryResult{
+ .YsonError = IssuesToYtErrorYson(program->Issues()),
+ };
+ }
+
+ if (!program->Compile(GetUsername())) {
+ return TQueryResult{
+ .YsonError = IssuesToYtErrorYson(program->Issues()),
+ };
+ }
+
+ NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
+ status = program->Run(GetUsername(), nullptr, nullptr, nullptr);
+
+ if (status == NYql::TProgram::TStatus::Error) {
+ return TQueryResult{
+ .YsonError = IssuesToYtErrorYson(program->Issues()),
+ };
+ }
+
+ TStringStream result;
+ if (program->HasResults()) {
+ ::NYson::TYsonWriter yson(&result, EYsonFormat::Binary);
+ yson.OnBeginList();
+ for (const auto& result: program->Results()) {
+ yson.OnListItem();
+ yson.OnRaw(result);
+ }
+ yson.OnEndList();
+ }
+
+ auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
+ if (!maybeStr) {
+ return std::nullopt;
+ }
+ return *maybeStr;
+ };
+
+ return {
+ .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()),
+ .Plan = maybeToOptional(program->GetQueryPlan()),
+ .Statistics = maybeToOptional(program->GetStatistics()),
+ .TaskInfo = maybeToOptional(program->GetTasksInfo()),
+ };
+ }
+
+ TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept override
+ {
+ try {
+ return GuardedRun(impersonationUser, queryText, settings);
+ } catch (const std::exception& ex) {
+ return TQueryResult{
+ .YsonError = ExceptionToYtErrorYson(ex),
+ };
+ }
+ }
+
+private:
+ NYql::TFileStoragePtr FileStorage_;
+ NYql::TExprContext ExprContext_;
+ ::TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_;
+ NYql::IModuleResolver::TPtr ModuleResolver_;
+ NYql::TGatewaysConfig GatewaysConfig_;
+ std::unique_ptr<NYql::TProgramFactory> ProgramFactory_;
+ TString YTTokenPath_;
+ THashMap<TString, TString> Clusters_;
+ std::optional<TString> DefaultCluster_;
+ THashMap<TString, TString> Modules_;
+ THashSet<TString> Libraries_;
+ TYsonString OperationAttributes_;
+
+ TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
+ {
+ NYT::TNode querySettingsMap = NodeFromYsonString(querySettings.ToString());
+ NYT::TNode resultAttributesMap = NodeFromYsonString(configAttributes.ToString());
+
+ for (const auto& item: querySettingsMap.AsMap()) {
+ resultAttributesMap[item.first] = item.second;
+ }
+
+ return NodeToYsonString(resultAttributesMap);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NNative
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IYqlPlugin> CreateYqlPlugin(TYqlPluginOptions& options) noexcept
+{
+ return std::make_unique<NNative::TYqlPlugin>(options);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYqlPlugin