diff options
author | udovichenko-r <udovichenko-r@yandex-team.com> | 2025-02-05 00:38:16 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.com> | 2025-02-05 00:59:33 +0300 |
commit | 1d3a1b1e4201d7a3cbb150b881aeffc3535c2995 (patch) | |
tree | 5a770b50f8df5c16c8318688cfa97cd0f55c51db | |
parent | c1014edad0dcf5233c816eaf3397fad095dcc38e (diff) | |
download | ydb-1d3a1b1e4201d7a3cbb150b881aeffc3535c2995.tar.gz |
Change "devtools/contrib/piglet/projects/ydblib/config.yaml"
commit_hash:a068cc9fa90067fb04da7964b210b69110aa0062
-rw-r--r-- | yt/yql/providers/yt/gateway/fmr/ya.make | 14 | ||||
-rw-r--r-- | yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp | 27 | ||||
-rw-r--r-- | yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h | 9 | ||||
-rw-r--r-- | yt/yql/tools/mrjob/mrjob.cpp | 29 | ||||
-rw-r--r-- | yt/yql/tools/mrjob/test/test.py | 29 | ||||
-rw-r--r-- | yt/yql/tools/mrjob/test/ya.make | 10 | ||||
-rw-r--r-- | yt/yql/tools/mrjob/ya.make | 36 | ||||
-rw-r--r-- | yt/yql/tools/ytrun/lib/ya.make | 34 | ||||
-rw-r--r-- | yt/yql/tools/ytrun/lib/ytrun_lib.cpp | 223 | ||||
-rw-r--r-- | yt/yql/tools/ytrun/lib/ytrun_lib.h | 37 | ||||
-rw-r--r-- | yt/yql/tools/ytrun/ya.make | 35 | ||||
-rw-r--r-- | yt/yql/tools/ytrun/ytrun.cpp | 11 |
12 files changed, 494 insertions, 0 deletions
diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make new file mode 100644 index 0000000000..4f805b1aca --- /dev/null +++ b/yt/yql/providers/yt/gateway/fmr/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + yql_yt_fmr.cpp +) + +PEERDIR( + yql/essentials/utils/log + yt/yql/providers/yt/provider +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp new file mode 100644 index 0000000000..81d4224530 --- /dev/null +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -0,0 +1,27 @@ +#include "yql_yt_fmr.h" + +#include <yql/essentials/utils/log/profile.h> + +#include <util/generic/ptr.h> + +using namespace NThreading; + +namespace NYql { + +namespace { + +class TFmrYtGateway final: public TYtForwardingGatewayBase { +public: + TFmrYtGateway(IYtGateway::TPtr&& slave) + : TYtForwardingGatewayBase(std::move(slave)) + { + } +}; + +} // namespace + +IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave) { + return MakeIntrusive<TFmrYtGateway>(std::move(slave)); +} + +} // namspace NYql diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h new file mode 100644 index 0000000000..97e26b63b1 --- /dev/null +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h @@ -0,0 +1,9 @@ +#pragma once + +#include <yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h> + +namespace NYql { + +IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave); + +} // namspace NYql diff --git a/yt/yql/tools/mrjob/mrjob.cpp b/yt/yql/tools/mrjob/mrjob.cpp new file mode 100644 index 0000000000..410c44d5af --- /dev/null +++ b/yt/yql/tools/mrjob/mrjob.cpp @@ -0,0 +1,29 @@ +#include <yt/yql/providers/yt/job/yql_job_registry.h> + +#include <yql/essentials/utils/backtrace/backtrace.h> + +#include <yt/cpp/mapreduce/client/init.h> + +#include <util/system/yassert.h> +#include <util/system/mlock.h> + +int main(int argc, const char *argv[]) { + Y_UNUSED(NKikimr::NUdf::GetStaticSymbols()); + try { + LockAllMemory(LockCurrentMemory | LockFutureMemory); + } catch (yexception&) { + Cerr << "mlockall failed, but that's fine" << Endl; + } + + NYql::NBacktrace::RegisterKikimrFatalActions(); + NYql::NBacktrace::EnableKikimrSymbolize(); + + try { + NYT::Initialize(argc, argv); + } catch (...) { + Cerr << CurrentExceptionMessage(); + return -1; + } + + Y_ABORT("This binary should not be called directly"); +} diff --git a/yt/yql/tools/mrjob/test/test.py b/yt/yql/tools/mrjob/test/test.py new file mode 100644 index 0000000000..41d5387461 --- /dev/null +++ b/yt/yql/tools/mrjob/test/test.py @@ -0,0 +1,29 @@ +import os +import os.path +import re +import subprocess + +import yatest.common + + +def test_libc(): + mrjob_dir = yatest.common.binary_path('yt/yql/tools/mrjob') + mrjob_path = os.path.join(mrjob_dir, 'mrjob') + tools_path = os.path.dirname(yatest.common.cxx_compiler_path()) + nm_path = os.path.join(tools_path, 'llvm-nm') + readelf_path = os.path.join(tools_path, 'readelf') + if os.path.isfile(nm_path): + result = subprocess.check_output([nm_path, mrjob_path]) + elif os.path.isfile(readelf_path): + result = subprocess.check_output([readelf_path, '-a', mrjob_path]) + else: + assert False, 'neither llvm-nm nor readelf found, checked paths: %s' % str((readelf_path, nm_path)) + + glibc_tag_count = 0 + for line in result.decode().split('\n'): + glibc_tag = re.search(r'GLIBC_[0-9\.]+', line) + if glibc_tag: + glibc_tag_count += 1 + parts = glibc_tag.group().split('.') + assert len(parts) > 1 + assert int(parts[1]) <= 11 diff --git a/yt/yql/tools/mrjob/test/ya.make b/yt/yql/tools/mrjob/test/ya.make new file mode 100644 index 0000000000..f3122797f7 --- /dev/null +++ b/yt/yql/tools/mrjob/test/ya.make @@ -0,0 +1,10 @@ +IF(OS_LINUX) + PY3TEST() + TEST_SRCS(test.py) + + DEPENDS( + yt/yql/tools/mrjob + ) + + END() +ENDIF() diff --git a/yt/yql/tools/mrjob/ya.make b/yt/yql/tools/mrjob/ya.make new file mode 100644 index 0000000000..4d88e6b5b5 --- /dev/null +++ b/yt/yql/tools/mrjob/ya.make @@ -0,0 +1,36 @@ +PROGRAM(mrjob) + +ALLOCATOR(J) + +SRCS( + mrjob.cpp +) + +IF (OS_LINUX) + # prevent external python extensions to lookup protobuf symbols (and maybe + # other common stuff) in main binary + EXPORTS_SCRIPT(${ARCADIA_ROOT}/yql/essentials/tools/exports.symlist) +ENDIF() + +PEERDIR( + yt/cpp/mapreduce/client + yql/essentials/public/udf/service/terminate_policy + yql/essentials/providers/common/gateway + yql/essentials/utils/backtrace + yql/essentials/parser/pg_wrapper + yql/essentials/sql/pg + yt/yql/providers/yt/job + yt/yql/providers/yt/codec/codegen + yt/yql/providers/yt/comp_nodes/llvm16 + yql/essentials/minikql/computation/llvm16 + yql/essentials/minikql/invoke_builtins/llvm16 + yql/essentials/minikql/comp_nodes/llvm16 +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + test +) diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make new file mode 100644 index 0000000000..12bcaa20fd --- /dev/null +++ b/yt/yql/tools/ytrun/lib/ya.make @@ -0,0 +1,34 @@ +LIBRARY() + +SRCS( + ytrun_lib.cpp +) + +PEERDIR( + yt/yql/providers/yt/provider + yt/yql/providers/yt/gateway/native + yt/yql/providers/yt/gateway/fmr + yt/yql/providers/yt/lib/config_clusters + yt/yql/providers/yt/lib/yt_download + yt/yql/providers/yt/lib/yt_url_lister + yt/yql/providers/yt/lib/log + + yql/essentials/providers/common/provider + yql/essentials/core/cbo + yql/essentials/core/peephole_opt + yql/essentials/core/cbo/simple + yql/essentials/core/services + yql/essentials/utils/backtrace + yql/essentials/tools/yql_facade_run + + yt/cpp/mapreduce/client + yt/cpp/mapreduce/interface + + library/cpp/digest/md5 + library/cpp/malloc/api + library/cpp/sighandler +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp new file mode 100644 index 0000000000..9a4b5d0377 --- /dev/null +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -0,0 +1,223 @@ +#include "ytrun_lib.h" + +#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h> +#include <yt/yql/providers/yt/provider/yql_yt_provider.h> +#include <yt/yql/providers/yt/lib/config_clusters/config_clusters.h> +#include <yt/yql/providers/yt/lib/yt_download/yt_download.h> +#include <yt/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h> +#include <yt/yql/providers/yt/lib/log/yt_logger.h> +#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h> +#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> +#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h> +#include <yql/essentials/core/services/yql_transform_pipeline.h> +#include <yql/essentials/core/cbo/simple/cbo_simple.h> +#include <yql/essentials/utils/backtrace/backtrace.h> + +#include <yt/cpp/mapreduce/client/init.h> +#include <yt/cpp/mapreduce/interface/config.h> + +#include <library/cpp/digest/md5/md5.h> +#include <library/cpp/malloc/api/malloc.h> +#include <library/cpp/sighandler/async_signals_handler.h> + +#include <util/folder/path.h> +#include <util/stream/file.h> + +namespace { + +class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator { +public: + TPeepHolePipelineConfigurator() = default; + + void AfterCreate(NYql::TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final { + pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow"); + pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput"); + pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); + pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput"); + } +}; + +TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE; + +void FlushYtDebugLogOnSignal() { + if (!NMalloc::IsAllocatorCorrupted) { + NYql::FlushYtDebugLog(); + } +} + +} // unnamed + +namespace NYql { + +TYtRunTool::TYtRunTool(TString name) + : TFacadeRunner(std::move(name)) +{ + GetRunOptions().EnableResultPosition = true; + GetRunOptions().EnableCredentials = true; + GetRunOptions().EnableQPlayer = true; + GetRunOptions().ResultStream = &Cout; + + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("user", "MR user") + .Optional() + .RequiredArgument("USER") + .StoreResult(&GetRunOptions().User); + + opts.AddLongOption("mrjob-bin", "Path to mrjob binary") + .Optional() + .StoreResult(&MrJobBin_); + opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs") + .Optional() + .StoreResult(&MrJobUdfsDir_); + opts.AddLongOption("show-progress", "Report operation progress").NoArgument() + .Handler0([&]() { + SetOperationProgressWriter([](const TOperationProgress& progress) { + TStringBuilder remoteId; + if (progress.RemoteId) { + remoteId << ", remoteId: " << progress.RemoteId; + } + TStringBuilder counters; + if (progress.Counters) { + if (progress.Counters->Running) { + counters << ' ' << progress.Counters->Running; + } + if (progress.Counters->Total) { + counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)"); + } + } + Cerr << "Operation: [" << progress.Category << "] " << progress.Id + << ", state: " << progress.State << remoteId << counters + << ", current stage: " << progress.Stage.first << Endl; + }); + }); + opts.AddLongOption("threads", "gateway threads") + .Optional() + .RequiredArgument("COUNT") + .StoreResult(&NumThreads_); + opts.AddLongOption("keep-temp", "keep temporary tables") + .Optional() + .NoArgument() + .SetFlag(&KeepTemp_); + opts.AddLongOption("use-graph-meta", "Use tables metadata from graph") + .Optional() + .NoArgument() + .SetFlag(&GetRunOptions().UseMetaFromGrpah); + }); + + GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) { + Y_UNUSED(res); + + if (!GetRunOptions().GatewaysConfig) { + GetRunOptions().GatewaysConfig = MakeHolder<TGatewaysConfig>(); + } + + auto ytConfig = GetRunOptions().GatewaysConfig->MutableYt(); + ytConfig->SetGatewayThreads(NumThreads_); + if (MrJobBin_.empty()) { + ytConfig->ClearMrJobBin(); + } else { + ytConfig->SetMrJobBin(MrJobBin_); + ytConfig->SetMrJobBinMd5(MD5::File(MrJobBin_)); + } + + if (MrJobUdfsDir_.empty()) { + ytConfig->ClearMrJobUdfsDir(); + } else { + ytConfig->SetMrJobUdfsDir(MrJobUdfsDir_); + } + auto attr = ytConfig->MutableDefaultSettings()->Add(); + attr->SetName("KeepTempTables"); + attr->SetValue(KeepTemp_ ? "yes" : "no"); + + FillClusterMapping(*ytConfig, TString{YtProviderName}); + + DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig); + }); + + GetRunOptions().SetSupportedGateways({TString{YtProviderName}}); + GetRunOptions().GatewayTypes.emplace(YtProviderName); + + AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr { + return MakeYtDownloader(*GetRunOptions().FsConfig, DefYtServer_); + }); + + AddUrlListerFactory([]() -> IUrlListerPtr { + return MakeYtUrlLister(); + }); + + AddProviderFactory([this]() -> NYql::TDataProviderInitializer { + if (GetRunOptions().GatewayTypes.contains(YtProviderName) && GetRunOptions().GatewaysConfig->HasYt()) { + return GetYtNativeDataProviderInitializer(CreateYtGateway(), CreateCboFactory(), CreateDqHelper()); + } + return {}; + }); + + SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE); +} + +IYtGateway::TPtr TYtRunTool::CreateYtGateway() { + TYtNativeServices services; + services.FunctionRegistry = GetFuncRegistry().Get(); + services.FileStorage = GetFileStorage(); + services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt()); + auto ytGateway = CreateYtNativeGateway(services); + return GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName) ? CreateYtFmrGateway(ytGateway): ytGateway; +} + +IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() { + return MakeSimpleCBOOptimizerFactory(); +} + +IDqHelper::TPtr TYtRunTool::CreateDqHelper() { + return {}; +} + +int TYtRunTool::DoMain(int argc, const char *argv[]) { + // Init MR/YT for proper work of embedded agent + NYT::Initialize(argc, argv); + + NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); }); + NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY); + + if (NYT::TConfig::Get()->Prefix.empty()) { + NYT::TConfig::Get()->Prefix = "//"; + } + + int res = TFacadeRunner::DoMain(argc, argv); + if (0 == res) { + NYql::DropYtDebugLog(); + } + return res; +} + +TProgram::TStatus TYtRunTool::DoRunProgram(TProgramPtr program) { + auto sigHandler = [program](int) { + Cerr << "Aborting..." << Endl; + try { + program->Abort().GetValueSync(); + } catch (...) { + Cerr << CurrentExceptionMessage(); + } + }; + SetAsyncSignalFunction(SIGINT, sigHandler); + SetAsyncSignalFunction(SIGTERM, sigHandler); + + TProgram::TStatus status = TFacadeRunner::DoRunProgram(program); + + auto dummySigHandler = [](int) { }; + SetAsyncSignalFunction(SIGINT, dummySigHandler); + SetAsyncSignalFunction(SIGTERM, dummySigHandler); + + return status; +} + +} // NYql diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h new file mode 100644 index 0000000000..de3c0dfd64 --- /dev/null +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h @@ -0,0 +1,37 @@ +#pragma once + +#include <yt/yql/providers/yt/provider/yql_yt_gateway.h> + +#include <yql/essentials/tools/yql_facade_run/yql_facade_run.h> +#include <yql/essentials/core/cbo/cbo_optimizer_new.h> +#include <yql/essentials/core/dq_integration/yql_dq_helper.h> + +#include <util/generic/string.h> +#include <util/generic/hash.h> + +namespace NYql { + +constexpr TStringBuf FastMapReduceGatewayName = "fmr"; + +class TYtRunTool: public TFacadeRunner { +public: + TYtRunTool(TString name = "ytrun"); + ~TYtRunTool() = default; + +protected: + int DoMain(int argc, const char *argv[]) override; + TProgram::TStatus DoRunProgram(TProgramPtr program) override; + + virtual IYtGateway::TPtr CreateYtGateway(); + virtual IOptimizerFactory::TPtr CreateCboFactory(); + virtual IDqHelper::TPtr CreateDqHelper(); + +protected: + TString MrJobBin_; + TString MrJobUdfsDir_; + size_t NumThreads_ = 1; + bool KeepTemp_ = false; + TString DefYtServer_; +}; + +} // NYql diff --git a/yt/yql/tools/ytrun/ya.make b/yt/yql/tools/ytrun/ya.make new file mode 100644 index 0000000000..d1b9a412a6 --- /dev/null +++ b/yt/yql/tools/ytrun/ya.make @@ -0,0 +1,35 @@ +PROGRAM() + +ALLOCATOR(J) + +SRCS( + ytrun.cpp +) + +IF (OS_LINUX) + # prevent external python extensions to lookup protobuf symbols (and maybe + # other common stuff) in main binary + EXPORTS_SCRIPT(${ARCADIA_ROOT}/yql/essentials/tools/exports.symlist) +ENDIF() + +PEERDIR( + yt/yql/tools/ytrun/lib + + yt/yql/providers/yt/codec/codegen + yt/yql/providers/yt/comp_nodes/llvm16 + yql/essentials/minikql/invoke_builtins/llvm16 + yql/essentials/minikql/comp_nodes/llvm16 + yql/essentials/parser/pg_wrapper + yql/essentials/public/udf/service/terminate_policy + yql/essentials/sql/pg +) + +YQL_LAST_ABI_VERSION() + +RESOURCE( + yql/essentials/cfg/tests/gateways.conf gateways.conf + yql/essentials/cfg/tests/fs.conf fs.conf + yql/essentials/cfg/tests/fs_arc.conf fs_arc.conf +) + +END() diff --git a/yt/yql/tools/ytrun/ytrun.cpp b/yt/yql/tools/ytrun/ytrun.cpp new file mode 100644 index 0000000000..d405eb419e --- /dev/null +++ b/yt/yql/tools/ytrun/ytrun.cpp @@ -0,0 +1,11 @@ +#include <yt/yql/tools/ytrun/lib/ytrun_lib.h> + +int main(int argc, const char *argv[]) { + try { + return NYql::TYtRunTool().Main(argc, argv); + } + catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + return 1; + } +} |