aboutsummaryrefslogtreecommitdiffstats
path: root/yql/tools/yqlrun/lib/yqlrun_lib.cpp
blob: f8b681a4564e9a6e77db39d6a3c19f083e2fe7db (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "yqlrun_lib.h"

#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
#include <yql/essentials/core/cbo/simple/cbo_simple.h>

#include <util/generic/yexception.h>
#include <util/folder/iterator.h>
#include <util/folder/dirut.h>
#include <util/folder/path.h>
#include <util/stream/output.h>

namespace {

class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
public:
    TPeepHolePipelineConfigurator() = default;

    void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
        Y_UNUSED(pipeline);
    }

    void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
        Y_UNUSED(pipeline);
    }

    void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
        pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
        pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
        pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
    }
};

TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;

} // unnamed

namespace NYql {

TYqlRunTool::TYqlRunTool()
    : TFacadeRunner("yqlrun")
{
    GetRunOptions().UseRepeatableRandomAndTimeProviders = true;
    GetRunOptions().ResultsFormat = NYson::EYsonFormat::Pretty;

    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption('t', "table", "Table mapping").RequiredArgument("table@file")
            .KVHandler([&](TString name, TString path) {
                if (name.empty() || path.empty()) {
                    throw yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt";
                }
                TablesMapping_[name] = path;
            }, '@');

    });
    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption("tables-dir", "Table dirs mapping").RequiredArgument("cluster@dir")
            .KVHandler([&](TString cluster, TString dir) {
                if (cluster.empty() || dir.empty()) {
                    throw yexception() << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables";
                }
                TablesDirMapping_[cluster] = dir;
                for (const auto& entry: TDirIterator(TFsPath(dir))) {
                    if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") {
                        auto tableName = TString(cluster).append('.').append(entryPath.RelativeTo(TFsPath(dir)).GetPath());
                        tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension
                        TablesMapping_[tableName] = entryPath.GetPath();
                    }
                }
            }, '@');

    });
    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption('C', "cluster", "Cluster to service mapping").RequiredArgument("name@service")
            .KVHandler([&](TString cluster, TString provider) {
                if (cluster.empty() || provider.empty()) {
                    throw yexception() << "Incorrect service mapping, expected form cluster@provider, e.g. plato@yt";
                }
                AddClusterMapping(std::move(cluster), std::move(provider));
            }, '@');

    });
    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption("ndebug", "Do not show debug info in error output").NoArgument().SetFlag(&GetRunOptions().NoDebug);
    });
    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption("keep-temp", "Keep temporary tables").NoArgument().SetFlag(&KeepTemp_);
    });
    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
            .Handler0([&]() {
                SetOperationProgressWriter([](const TOperationProgress& progress) {
                    Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n";
                });
            });
    });
    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption("tmp-dir", "Directory for temporary tables").RequiredArgument("DIR").StoreResult(&TmpDir_);
    });

    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption("test-format", "Compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument().SetFlag(&GetRunOptions().TestSqlFormat);
    });
    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
        opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat);
    });

    GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
    GetRunOptions().GatewayTypes.emplace(YtProviderName);
    AddClusterMapping(TString{"plato"}, TString{YtProviderName});

    AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
        auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_);
        auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices);
        return GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {});
    });

    SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);

}


} // NYql