1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#include "yqlrun_lib.h"
#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/core/cbo/simple/cbo_simple.h>
#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
#include <util/generic/yexception.h>
#include <util/folder/iterator.h>
#include <util/folder/dirut.h>
#include <util/folder/path.h>
#include <util/stream/output.h>
namespace {
class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
public:
TPeepHolePipelineConfigurator() = default;
void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
Y_UNUSED(pipeline);
}
void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
Y_UNUSED(pipeline);
}
void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
}
};
TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
} // unnamed
namespace NYql {
TYqlRunTool::TYqlRunTool()
: TFacadeRunner("yqlrun")
{
GetRunOptions().UseRepeatableRandomAndTimeProviders = true;
GetRunOptions().ResultsFormat = NYson::EYsonFormat::Pretty;
GetRunOptions().CustomTests = true;
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption('t', "table", "Table mapping").RequiredArgument("table@file")
.KVHandler([&](TString name, TString path) {
if (name.empty() || path.empty()) {
throw yexception() << "Incorrect table mapping, expected form table@file, e.g. [email protected]";
}
TablesMapping_[name] = path;
}, '@');
opts.AddLongOption("tables-dir", "Table dirs mapping").RequiredArgument("cluster@dir")
.KVHandler([&](TString cluster, TString dir) {
if (cluster.empty() || dir.empty()) {
throw yexception() << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables";
}
TablesDirMapping_[cluster] = dir;
for (const auto& entry: TDirIterator(TFsPath(dir))) {
if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") {
auto tableName = TString(cluster).append('.').append(entryPath.RelativeTo(TFsPath(dir)).GetPath());
tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension
TablesMapping_[tableName] = entryPath.GetPath();
}
}
}, '@');
opts.AddLongOption('C', "cluster", "Cluster to service mapping").RequiredArgument("name@service")
.KVHandler([&](TString cluster, TString provider) {
if (cluster.empty() || provider.empty()) {
throw yexception() << "Incorrect service mapping, expected form cluster@provider, e.g. plato@yt";
}
AddClusterMapping(std::move(cluster), std::move(provider));
}, '@');
opts.AddLongOption("ndebug", "Do not show debug info in error output").NoArgument().SetFlag(&GetRunOptions().NoDebug);
opts.AddLongOption("keep-temp", "Keep temporary tables").NoArgument().SetFlag(&KeepTemp_);
opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
.Handler0([&]() {
SetOperationProgressWriter([](const TOperationProgress& progress) {
Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n";
});
});
opts.AddLongOption("tmp-dir", "Directory for temporary tables").RequiredArgument("DIR").StoreResult(&TmpDir_);
});
GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
Y_UNUSED(res);
if (GetRunOptions().GatewaysConfig) {
auto ytConfig = GetRunOptions().GatewaysConfig->GetYt();
FillClusterMapping(ytConfig, TString{YtProviderName});
}
});
GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
GetRunOptions().GatewayTypes.emplace(YtProviderName);
AddClusterMapping(TString{"plato"}, TString{YtProviderName});
AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
auto ytNativeGateway = CreateYtGateway();
auto optimizerFactory = CreateCboFactory();
return GetYtNativeDataProviderInitializer(ytNativeGateway, optimizerFactory, {});
});
SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
}
IYtGateway::TPtr TYqlRunTool::CreateYtGateway() {
auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_);
return CreateYtFileGateway(yqlNativeServices);
}
IOptimizerFactory::TPtr TYqlRunTool::CreateCboFactory() {
return MakeSimpleCBOOptimizerFactory();
}
} // NYql
|