aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/py_helpers.cpp
blob: 3072449866ed67f7c5e7881810310f1ee8fcf753 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include "py_helpers.h"

#include "client.h"
#include "operation.h"
#include "transaction.h"

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/fluent.h>

#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/common/helpers.h>

#include <library/cpp/yson/node/node_io.h>

#include <util/generic/hash_set.h>

namespace NYT {

using namespace NDetail;

////////////////////////////////////////////////////////////////////////////////

IStructuredJobPtr ConstructJob(const TString& jobName, const TString& state)
{
    auto node = TNode();
    if (!state.empty()) {
        node = NodeFromYsonString(state);
    }
    return TJobFactory::Get()->GetConstructingFunction(jobName.data())(node);
}

TString GetJobStateString(const IStructuredJob& job)
{
    TString result;
    {
        TStringOutput output(result);
        job.Save(output);
        output.Finish();
    }
    return result;
}

TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOperationPreparer& preparer)
{
    int intermediateTableCount = 0;
    TVector<TRichYPath> paths;
    for (const auto& inputNode : node.AsList()) {
        if (inputNode.IsNull()) {
            ++intermediateTableCount;
        } else {
            paths.emplace_back(inputNode.AsString());
        }
    }
    paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths);
    TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure()));
    for (const auto& path : paths) {
        result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path});
    }
    return result;
}

TString GetIOInfo(
    const IStructuredJob& job,
    const TCreateClientOptions& options,
    const TString& cluster,
    const TString& transactionId,
    const TString& inputPaths,
    const TString& outputPaths,
    const TString& neededColumns)
{
    auto client = NDetail::CreateClientImpl(cluster, options);
    TOperationPreparer preparer(client, GetGuid(transactionId));

    auto structuredInputs = NodeToStructuredTablePaths(NodeFromYsonString(inputPaths), preparer);
    auto structuredOutputs = NodeToStructuredTablePaths(NodeFromYsonString(outputPaths), preparer);

    auto neededColumnsNode = NodeFromYsonString(neededColumns);
    THashSet<TString> columnsUsedInOperations;
    for (const auto& columnNode : neededColumnsNode.AsList()) {
        columnsUsedInOperations.insert(columnNode.AsString());
    }

    auto operationIo = CreateSimpleOperationIoHelper(
        job,
        preparer,
        TOperationOptions(),
        std::move(structuredInputs),
        std::move(structuredOutputs),
        TUserJobFormatHints(),
        ENodeReaderFormat::Yson,
        columnsUsedInOperations);

    return BuildYsonStringFluently().BeginMap()
        .Item("input_format").Value(operationIo.InputFormat.Config)
        .Item("output_format").Value(operationIo.OutputFormat.Config)
        .Item("input_table_paths").List(operationIo.Inputs)
        .Item("output_table_paths").List(operationIo.Outputs)
        .Item("small_files").DoListFor(
            operationIo.JobFiles.begin(),
            operationIo.JobFiles.end(),
            [] (TFluentList fluent, auto fileIt) {
                fluent.Item().BeginMap()
                    .Item("file_name").Value(fileIt->FileName)
                    .Item("data").Value(fileIt->Data)
                .EndMap();
            })
    .EndMap();
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT