aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.com>2025-02-05 00:38:16 +0300
committerudovichenko-r <udovichenko-r@yandex-team.com>2025-02-05 00:59:33 +0300
commit1d3a1b1e4201d7a3cbb150b881aeffc3535c2995 (patch)
tree5a770b50f8df5c16c8318688cfa97cd0f55c51db
parentc1014edad0dcf5233c816eaf3397fad095dcc38e (diff)
downloadydb-1d3a1b1e4201d7a3cbb150b881aeffc3535c2995.tar.gz
Change "devtools/contrib/piglet/projects/ydblib/config.yaml"
commit_hash:a068cc9fa90067fb04da7964b210b69110aa0062
-rw-r--r--yt/yql/providers/yt/gateway/fmr/ya.make14
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp27
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h9
-rw-r--r--yt/yql/tools/mrjob/mrjob.cpp29
-rw-r--r--yt/yql/tools/mrjob/test/test.py29
-rw-r--r--yt/yql/tools/mrjob/test/ya.make10
-rw-r--r--yt/yql/tools/mrjob/ya.make36
-rw-r--r--yt/yql/tools/ytrun/lib/ya.make34
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp223
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.h37
-rw-r--r--yt/yql/tools/ytrun/ya.make35
-rw-r--r--yt/yql/tools/ytrun/ytrun.cpp11
12 files changed, 494 insertions, 0 deletions
diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make
new file mode 100644
index 0000000000..4f805b1aca
--- /dev/null
+++ b/yt/yql/providers/yt/gateway/fmr/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_fmr.cpp
+)
+
+PEERDIR(
+ yql/essentials/utils/log
+ yt/yql/providers/yt/provider
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
new file mode 100644
index 0000000000..81d4224530
--- /dev/null
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -0,0 +1,27 @@
+#include "yql_yt_fmr.h"
+
+#include <yql/essentials/utils/log/profile.h>
+
+#include <util/generic/ptr.h>
+
+using namespace NThreading;
+
+namespace NYql {
+
+namespace {
+
+class TFmrYtGateway final: public TYtForwardingGatewayBase {
+public:
+ TFmrYtGateway(IYtGateway::TPtr&& slave)
+ : TYtForwardingGatewayBase(std::move(slave))
+ {
+ }
+};
+
+} // namespace
+
+IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave) {
+ return MakeIntrusive<TFmrYtGateway>(std::move(slave));
+}
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
new file mode 100644
index 0000000000..97e26b63b1
--- /dev/null
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h>
+
+namespace NYql {
+
+IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave);
+
+} // namspace NYql
diff --git a/yt/yql/tools/mrjob/mrjob.cpp b/yt/yql/tools/mrjob/mrjob.cpp
new file mode 100644
index 0000000000..410c44d5af
--- /dev/null
+++ b/yt/yql/tools/mrjob/mrjob.cpp
@@ -0,0 +1,29 @@
+#include <yt/yql/providers/yt/job/yql_job_registry.h>
+
+#include <yql/essentials/utils/backtrace/backtrace.h>
+
+#include <yt/cpp/mapreduce/client/init.h>
+
+#include <util/system/yassert.h>
+#include <util/system/mlock.h>
+
+int main(int argc, const char *argv[]) {
+ Y_UNUSED(NKikimr::NUdf::GetStaticSymbols());
+ try {
+ LockAllMemory(LockCurrentMemory | LockFutureMemory);
+ } catch (yexception&) {
+ Cerr << "mlockall failed, but that's fine" << Endl;
+ }
+
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+ NYql::NBacktrace::EnableKikimrSymbolize();
+
+ try {
+ NYT::Initialize(argc, argv);
+ } catch (...) {
+ Cerr << CurrentExceptionMessage();
+ return -1;
+ }
+
+ Y_ABORT("This binary should not be called directly");
+}
diff --git a/yt/yql/tools/mrjob/test/test.py b/yt/yql/tools/mrjob/test/test.py
new file mode 100644
index 0000000000..41d5387461
--- /dev/null
+++ b/yt/yql/tools/mrjob/test/test.py
@@ -0,0 +1,29 @@
+import os
+import os.path
+import re
+import subprocess
+
+import yatest.common
+
+
+def test_libc():
+ mrjob_dir = yatest.common.binary_path('yt/yql/tools/mrjob')
+ mrjob_path = os.path.join(mrjob_dir, 'mrjob')
+ tools_path = os.path.dirname(yatest.common.cxx_compiler_path())
+ nm_path = os.path.join(tools_path, 'llvm-nm')
+ readelf_path = os.path.join(tools_path, 'readelf')
+ if os.path.isfile(nm_path):
+ result = subprocess.check_output([nm_path, mrjob_path])
+ elif os.path.isfile(readelf_path):
+ result = subprocess.check_output([readelf_path, '-a', mrjob_path])
+ else:
+ assert False, 'neither llvm-nm nor readelf found, checked paths: %s' % str((readelf_path, nm_path))
+
+ glibc_tag_count = 0
+ for line in result.decode().split('\n'):
+ glibc_tag = re.search(r'GLIBC_[0-9\.]+', line)
+ if glibc_tag:
+ glibc_tag_count += 1
+ parts = glibc_tag.group().split('.')
+ assert len(parts) > 1
+ assert int(parts[1]) <= 11
diff --git a/yt/yql/tools/mrjob/test/ya.make b/yt/yql/tools/mrjob/test/ya.make
new file mode 100644
index 0000000000..f3122797f7
--- /dev/null
+++ b/yt/yql/tools/mrjob/test/ya.make
@@ -0,0 +1,10 @@
+IF(OS_LINUX)
+ PY3TEST()
+ TEST_SRCS(test.py)
+
+ DEPENDS(
+ yt/yql/tools/mrjob
+ )
+
+ END()
+ENDIF()
diff --git a/yt/yql/tools/mrjob/ya.make b/yt/yql/tools/mrjob/ya.make
new file mode 100644
index 0000000000..4d88e6b5b5
--- /dev/null
+++ b/yt/yql/tools/mrjob/ya.make
@@ -0,0 +1,36 @@
+PROGRAM(mrjob)
+
+ALLOCATOR(J)
+
+SRCS(
+ mrjob.cpp
+)
+
+IF (OS_LINUX)
+ # prevent external python extensions to lookup protobuf symbols (and maybe
+ # other common stuff) in main binary
+ EXPORTS_SCRIPT(${ARCADIA_ROOT}/yql/essentials/tools/exports.symlist)
+ENDIF()
+
+PEERDIR(
+ yt/cpp/mapreduce/client
+ yql/essentials/public/udf/service/terminate_policy
+ yql/essentials/providers/common/gateway
+ yql/essentials/utils/backtrace
+ yql/essentials/parser/pg_wrapper
+ yql/essentials/sql/pg
+ yt/yql/providers/yt/job
+ yt/yql/providers/yt/codec/codegen
+ yt/yql/providers/yt/comp_nodes/llvm16
+ yql/essentials/minikql/computation/llvm16
+ yql/essentials/minikql/invoke_builtins/llvm16
+ yql/essentials/minikql/comp_nodes/llvm16
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ test
+)
diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make
new file mode 100644
index 0000000000..12bcaa20fd
--- /dev/null
+++ b/yt/yql/tools/ytrun/lib/ya.make
@@ -0,0 +1,34 @@
+LIBRARY()
+
+SRCS(
+ ytrun_lib.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/provider
+ yt/yql/providers/yt/gateway/native
+ yt/yql/providers/yt/gateway/fmr
+ yt/yql/providers/yt/lib/config_clusters
+ yt/yql/providers/yt/lib/yt_download
+ yt/yql/providers/yt/lib/yt_url_lister
+ yt/yql/providers/yt/lib/log
+
+ yql/essentials/providers/common/provider
+ yql/essentials/core/cbo
+ yql/essentials/core/peephole_opt
+ yql/essentials/core/cbo/simple
+ yql/essentials/core/services
+ yql/essentials/utils/backtrace
+ yql/essentials/tools/yql_facade_run
+
+ yt/cpp/mapreduce/client
+ yt/cpp/mapreduce/interface
+
+ library/cpp/digest/md5
+ library/cpp/malloc/api
+ library/cpp/sighandler
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
new file mode 100644
index 0000000000..9a4b5d0377
--- /dev/null
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -0,0 +1,223 @@
+#include "ytrun_lib.h"
+
+#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
+#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
+#include <yt/yql/providers/yt/lib/config_clusters/config_clusters.h>
+#include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
+#include <yt/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h>
+#include <yt/yql/providers/yt/lib/log/yt_logger.h>
+#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
+#include <yql/essentials/core/services/yql_transform_pipeline.h>
+#include <yql/essentials/core/cbo/simple/cbo_simple.h>
+#include <yql/essentials/utils/backtrace/backtrace.h>
+
+#include <yt/cpp/mapreduce/client/init.h>
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/malloc/api/malloc.h>
+#include <library/cpp/sighandler/async_signals_handler.h>
+
+#include <util/folder/path.h>
+#include <util/stream/file.h>
+
+namespace {
+
+class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
+public:
+ TPeepHolePipelineConfigurator() = default;
+
+ void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
+ pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
+ pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
+ pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
+ pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
+ }
+};
+
+TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
+
+void FlushYtDebugLogOnSignal() {
+ if (!NMalloc::IsAllocatorCorrupted) {
+ NYql::FlushYtDebugLog();
+ }
+}
+
+} // unnamed
+
+namespace NYql {
+
+TYtRunTool::TYtRunTool(TString name)
+ : TFacadeRunner(std::move(name))
+{
+ GetRunOptions().EnableResultPosition = true;
+ GetRunOptions().EnableCredentials = true;
+ GetRunOptions().EnableQPlayer = true;
+ GetRunOptions().ResultStream = &Cout;
+
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("user", "MR user")
+ .Optional()
+ .RequiredArgument("USER")
+ .StoreResult(&GetRunOptions().User);
+
+ opts.AddLongOption("mrjob-bin", "Path to mrjob binary")
+ .Optional()
+ .StoreResult(&MrJobBin_);
+ opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs")
+ .Optional()
+ .StoreResult(&MrJobUdfsDir_);
+ opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
+ .Handler0([&]() {
+ SetOperationProgressWriter([](const TOperationProgress& progress) {
+ TStringBuilder remoteId;
+ if (progress.RemoteId) {
+ remoteId << ", remoteId: " << progress.RemoteId;
+ }
+ TStringBuilder counters;
+ if (progress.Counters) {
+ if (progress.Counters->Running) {
+ counters << ' ' << progress.Counters->Running;
+ }
+ if (progress.Counters->Total) {
+ counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)");
+ }
+ }
+ Cerr << "Operation: [" << progress.Category << "] " << progress.Id
+ << ", state: " << progress.State << remoteId << counters
+ << ", current stage: " << progress.Stage.first << Endl;
+ });
+ });
+ opts.AddLongOption("threads", "gateway threads")
+ .Optional()
+ .RequiredArgument("COUNT")
+ .StoreResult(&NumThreads_);
+ opts.AddLongOption("keep-temp", "keep temporary tables")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&KeepTemp_);
+ opts.AddLongOption("use-graph-meta", "Use tables metadata from graph")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&GetRunOptions().UseMetaFromGrpah);
+ });
+
+ GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
+ Y_UNUSED(res);
+
+ if (!GetRunOptions().GatewaysConfig) {
+ GetRunOptions().GatewaysConfig = MakeHolder<TGatewaysConfig>();
+ }
+
+ auto ytConfig = GetRunOptions().GatewaysConfig->MutableYt();
+ ytConfig->SetGatewayThreads(NumThreads_);
+ if (MrJobBin_.empty()) {
+ ytConfig->ClearMrJobBin();
+ } else {
+ ytConfig->SetMrJobBin(MrJobBin_);
+ ytConfig->SetMrJobBinMd5(MD5::File(MrJobBin_));
+ }
+
+ if (MrJobUdfsDir_.empty()) {
+ ytConfig->ClearMrJobUdfsDir();
+ } else {
+ ytConfig->SetMrJobUdfsDir(MrJobUdfsDir_);
+ }
+ auto attr = ytConfig->MutableDefaultSettings()->Add();
+ attr->SetName("KeepTempTables");
+ attr->SetValue(KeepTemp_ ? "yes" : "no");
+
+ FillClusterMapping(*ytConfig, TString{YtProviderName});
+
+ DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig);
+ });
+
+ GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
+ GetRunOptions().GatewayTypes.emplace(YtProviderName);
+
+ AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr {
+ return MakeYtDownloader(*GetRunOptions().FsConfig, DefYtServer_);
+ });
+
+ AddUrlListerFactory([]() -> IUrlListerPtr {
+ return MakeYtUrlLister();
+ });
+
+ AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
+ if (GetRunOptions().GatewayTypes.contains(YtProviderName) && GetRunOptions().GatewaysConfig->HasYt()) {
+ return GetYtNativeDataProviderInitializer(CreateYtGateway(), CreateCboFactory(), CreateDqHelper());
+ }
+ return {};
+ });
+
+ SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
+}
+
+IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
+ TYtNativeServices services;
+ services.FunctionRegistry = GetFuncRegistry().Get();
+ services.FileStorage = GetFileStorage();
+ services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
+ auto ytGateway = CreateYtNativeGateway(services);
+ return GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName) ? CreateYtFmrGateway(ytGateway): ytGateway;
+}
+
+IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
+ return MakeSimpleCBOOptimizerFactory();
+}
+
+IDqHelper::TPtr TYtRunTool::CreateDqHelper() {
+ return {};
+}
+
+int TYtRunTool::DoMain(int argc, const char *argv[]) {
+ // Init MR/YT for proper work of embedded agent
+ NYT::Initialize(argc, argv);
+
+ NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); });
+ NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY);
+
+ if (NYT::TConfig::Get()->Prefix.empty()) {
+ NYT::TConfig::Get()->Prefix = "//";
+ }
+
+ int res = TFacadeRunner::DoMain(argc, argv);
+ if (0 == res) {
+ NYql::DropYtDebugLog();
+ }
+ return res;
+}
+
+TProgram::TStatus TYtRunTool::DoRunProgram(TProgramPtr program) {
+ auto sigHandler = [program](int) {
+ Cerr << "Aborting..." << Endl;
+ try {
+ program->Abort().GetValueSync();
+ } catch (...) {
+ Cerr << CurrentExceptionMessage();
+ }
+ };
+ SetAsyncSignalFunction(SIGINT, sigHandler);
+ SetAsyncSignalFunction(SIGTERM, sigHandler);
+
+ TProgram::TStatus status = TFacadeRunner::DoRunProgram(program);
+
+ auto dummySigHandler = [](int) { };
+ SetAsyncSignalFunction(SIGINT, dummySigHandler);
+ SetAsyncSignalFunction(SIGTERM, dummySigHandler);
+
+ return status;
+}
+
+} // NYql
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h
new file mode 100644
index 0000000000..de3c0dfd64
--- /dev/null
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
+
+#include <yql/essentials/tools/yql_facade_run/yql_facade_run.h>
+#include <yql/essentials/core/cbo/cbo_optimizer_new.h>
+#include <yql/essentials/core/dq_integration/yql_dq_helper.h>
+
+#include <util/generic/string.h>
+#include <util/generic/hash.h>
+
+namespace NYql {
+
+constexpr TStringBuf FastMapReduceGatewayName = "fmr";
+
+class TYtRunTool: public TFacadeRunner {
+public:
+ TYtRunTool(TString name = "ytrun");
+ ~TYtRunTool() = default;
+
+protected:
+ int DoMain(int argc, const char *argv[]) override;
+ TProgram::TStatus DoRunProgram(TProgramPtr program) override;
+
+ virtual IYtGateway::TPtr CreateYtGateway();
+ virtual IOptimizerFactory::TPtr CreateCboFactory();
+ virtual IDqHelper::TPtr CreateDqHelper();
+
+protected:
+ TString MrJobBin_;
+ TString MrJobUdfsDir_;
+ size_t NumThreads_ = 1;
+ bool KeepTemp_ = false;
+ TString DefYtServer_;
+};
+
+} // NYql
diff --git a/yt/yql/tools/ytrun/ya.make b/yt/yql/tools/ytrun/ya.make
new file mode 100644
index 0000000000..d1b9a412a6
--- /dev/null
+++ b/yt/yql/tools/ytrun/ya.make
@@ -0,0 +1,35 @@
+PROGRAM()
+
+ALLOCATOR(J)
+
+SRCS(
+ ytrun.cpp
+)
+
+IF (OS_LINUX)
+ # prevent external python extensions to lookup protobuf symbols (and maybe
+ # other common stuff) in main binary
+ EXPORTS_SCRIPT(${ARCADIA_ROOT}/yql/essentials/tools/exports.symlist)
+ENDIF()
+
+PEERDIR(
+ yt/yql/tools/ytrun/lib
+
+ yt/yql/providers/yt/codec/codegen
+ yt/yql/providers/yt/comp_nodes/llvm16
+ yql/essentials/minikql/invoke_builtins/llvm16
+ yql/essentials/minikql/comp_nodes/llvm16
+ yql/essentials/parser/pg_wrapper
+ yql/essentials/public/udf/service/terminate_policy
+ yql/essentials/sql/pg
+)
+
+YQL_LAST_ABI_VERSION()
+
+RESOURCE(
+ yql/essentials/cfg/tests/gateways.conf gateways.conf
+ yql/essentials/cfg/tests/fs.conf fs.conf
+ yql/essentials/cfg/tests/fs_arc.conf fs_arc.conf
+)
+
+END()
diff --git a/yt/yql/tools/ytrun/ytrun.cpp b/yt/yql/tools/ytrun/ytrun.cpp
new file mode 100644
index 0000000000..d405eb419e
--- /dev/null
+++ b/yt/yql/tools/ytrun/ytrun.cpp
@@ -0,0 +1,11 @@
+#include <yt/yql/tools/ytrun/lib/ytrun_lib.h>
+
+int main(int argc, const char *argv[]) {
+ try {
+ return NYql::TYtRunTool().Main(argc, argv);
+ }
+ catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ return 1;
+ }
+}