aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/common/helpers.cpp
blob: 95924d812c9f394a435c66935e9c3629877eb521 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#include "helpers.h"

#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/serialize.h>
#include <yt/cpp/mapreduce/interface/fluent.h>

#include <library/cpp/yson/node/node_builder.h>
#include <library/cpp/yson/node/node_visitor.h>

#include <library/cpp/yson/parser.h>
#include <library/cpp/yson/writer.h>

#include <library/cpp/json/json_reader.h>
#include <library/cpp/json/json_value.h>

#include <util/stream/input.h>
#include <util/stream/output.h>
#include <util/stream/str.h>

namespace NYT {

////////////////////////////////////////////////////////////////////////////////

TString NodeListToYsonString(const TNode::TListType& nodes)
{
    TStringStream stream;
    ::NYson::TYsonWriter writer(&stream, NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment);
    auto list = BuildYsonListFluently(&writer);
    for (const auto& node : nodes) {
        list.Item().Value(node);
    }
    return stream.Str();
}

TNode PathToNode(const TRichYPath& path)
{
    TNode result;
    TNodeBuilder builder(&result);
    Serialize(path, &builder);
    return result;
}

TNode PathToParamNode(const TRichYPath& path)
{
    return TNode()("path", PathToNode(path));
}

TString AttributesToYsonString(const TNode& node)
{
    return BuildYsonStringFluently().BeginMap()
        .Item("attributes").Value(node)
    .EndMap();
}

TString AttributeFilterToYsonString(const TAttributeFilter& filter)
{
    return BuildYsonStringFluently().BeginMap()
        .Item("attributes").Value(filter)
    .EndMap();
}

TNode NodeFromTableSchema(const TTableSchema& schema)
{
    TNode result;
    TNodeBuilder builder(&result);
    Serialize(schema, &builder);
    return result;
}

void MergeNodes(TNode& dst, const TNode& src)
{
    if (dst.IsMap() && src.IsMap()) {
        auto& dstMap = dst.AsMap();
        const auto& srcMap = src.AsMap();
        for (const auto& srcItem : srcMap) {
            const auto& key = srcItem.first;
            auto dstItem = dstMap.find(key);
            if (dstItem != dstMap.end()) {
                MergeNodes(dstItem->second, srcItem.second);
            } else {
                dstMap[key] = srcItem.second;
            }
        }
    } else {
        if (dst.GetType() == src.GetType() && src.HasAttributes()) {
            auto attributes = dst.GetAttributes();
            MergeNodes(attributes, src.GetAttributes());
            dst = src;
            dst.Attributes() = attributes;
        } else {
            dst = src;
        }
    }
}

TYPath AddPathPrefix(const TYPath& path, const TString& pathPrefix)
{
    if (path.StartsWith("//") || path.StartsWith("#")) {
        return path;
    }
    return pathPrefix + path;
}

TString GetWriteTableCommand(const TString& apiVersion)
{
    return apiVersion == "v2" ? "write" : "write_table";
}

TString GetReadTableCommand(const TString& apiVersion)
{
    return apiVersion == "v2" ? "read" : "read_table";
}

TString GetWriteFileCommand(const TString& apiVersion)
{
    return apiVersion == "v2" ? "upload" : "write_file";
}

TString GetReadFileCommand(const TString& apiVersion)
{
    return apiVersion == "v2" ? "download" : "read_file";
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT