1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
#include "yqlrun_lib.h"
#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
#include <yql/essentials/core/cbo/simple/cbo_simple.h>
#include <util/generic/yexception.h>
#include <util/folder/iterator.h>
#include <util/folder/dirut.h>
#include <util/folder/path.h>
#include <util/stream/output.h>
namespace {
class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
public:
TPeepHolePipelineConfigurator() = default;
void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
Y_UNUSED(pipeline);
}
void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
Y_UNUSED(pipeline);
}
void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
}
};
TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
} // unnamed
namespace NYql {
TYqlRunTool::TYqlRunTool()
: TFacadeRunner("yqlrun")
{
GetRunOptions().UseRepeatableRandomAndTimeProviders = true;
GetRunOptions().ResultsFormat = NYson::EYsonFormat::Pretty;
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption('t', "table", "Table mapping").RequiredArgument("table@file")
.KVHandler([&](TString name, TString path) {
if (name.empty() || path.empty()) {
throw yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt";
}
TablesMapping_[name] = path;
}, '@');
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption("tables-dir", "Table dirs mapping").RequiredArgument("cluster@dir")
.KVHandler([&](TString cluster, TString dir) {
if (cluster.empty() || dir.empty()) {
throw yexception() << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables";
}
TablesDirMapping_[cluster] = dir;
for (const auto& entry: TDirIterator(TFsPath(dir))) {
if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") {
auto tableName = TString(cluster).append('.').append(entryPath.RelativeTo(TFsPath(dir)).GetPath());
tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension
TablesMapping_[tableName] = entryPath.GetPath();
}
}
}, '@');
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption('C', "cluster", "Cluster to service mapping").RequiredArgument("name@service")
.KVHandler([&](TString cluster, TString provider) {
if (cluster.empty() || provider.empty()) {
throw yexception() << "Incorrect service mapping, expected form cluster@provider, e.g. plato@yt";
}
AddClusterMapping(std::move(cluster), std::move(provider));
}, '@');
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption("ndebug", "Do not show debug info in error output").NoArgument().SetFlag(&GetRunOptions().NoDebug);
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption("keep-temp", "Keep temporary tables").NoArgument().SetFlag(&KeepTemp_);
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
.Handler0([&]() {
SetOperationProgressWriter([](const TOperationProgress& progress) {
Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n";
});
});
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption("tmp-dir", "Directory for temporary tables").RequiredArgument("DIR").StoreResult(&TmpDir_);
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption("test-format", "Compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument().SetFlag(&GetRunOptions().TestSqlFormat);
});
GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat);
});
GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
GetRunOptions().GatewayTypes.emplace(YtProviderName);
AddClusterMapping(TString{"plato"}, TString{YtProviderName});
AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_);
auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices);
return GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {});
});
SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
}
} // NYql
|