aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation_helpers.cpp
blob: a15b56cf75c6f1070126a34b76f589f2c9e24cf6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include "operation_helpers.h"

#include <yt/cpp/mapreduce/common/retry_lib.h>

#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>

#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/raw_client.h>

#include <yt/cpp/mapreduce/interface/logging/yt_log.h>

#include <yt/cpp/mapreduce/raw_client/raw_requests.h>

#include <util/string/builder.h>

#include <util/system/mutex.h>
#include <util/system/rwlock.h>

namespace NYT::NDetail {

////////////////////////////////////////////////////////////////////////////////

ui64 RoundUpFileSize(ui64 size)
{
    constexpr ui64 roundUpTo = 4ull << 10;
    return (size + roundUpTo - 1) & ~(roundUpTo - 1);
}

bool UseLocalModeOptimization(
    const IRawClientPtr& rawClient,
    const TClientContext& context,
    const IClientRetryPolicyPtr& clientRetryPolicy)
{
    if (!context.Config->EnableLocalModeOptimization) {
        return false;
    }

    static THashMap<TString, bool> localModeMap;
    static TRWMutex mutex;

    {
        TReadGuard guard(mutex);
        auto it = localModeMap.find(context.ServerName);
        if (it != localModeMap.end()) {
            return it->second;
        }
    }

    bool isLocalMode = false;
    TString localModeAttr("//sys/@local_mode_fqdn");
    // We don't want to pollute logs with errors about failed request,
    // so we check if path exists before getting it.
    auto exists = RequestWithRetry<bool>(
        clientRetryPolicy->CreatePolicyForGenericRequest(),
        [&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
            return rawClient->Exists(
                TTransactionId(),
                localModeAttr,
                TExistsOptions().ReadFrom(EMasterReadKind::Cache));
        });
    if (exists)
    {
        auto fqdnNode = RequestWithRetry<TNode>(
            clientRetryPolicy->CreatePolicyForGenericRequest(),
            [&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
                return rawClient->TryGet(
                    TTransactionId(),
                    localModeAttr,
                    TGetOptions().ReadFrom(EMasterReadKind::Cache));
            });
        if (!fqdnNode.IsUndefined()) {
            auto fqdn = fqdnNode.AsString();
            isLocalMode = (fqdn == TProcessState::Get()->FqdnHostName);
            YT_LOG_DEBUG("Checking local mode; LocalModeFqdn: %v FqdnHostName: %v IsLocalMode: %v",
                fqdn,
                TProcessState::Get()->FqdnHostName,
                isLocalMode ? "true" : "false");
        }
    }

    {
        TWriteGuard guard(mutex);
        localModeMap[context.ServerName] = isLocalMode;
    }

    return isLocalMode;
}

TString GetOperationWebInterfaceUrl(TStringBuf serverName, TOperationId operationId)
{
    serverName.ChopSuffix(":80");
    serverName.ChopSuffix(".yt.yandex-team.ru");
    serverName.ChopSuffix(".yt.yandex.net");
    return ::TStringBuilder() << "https://yt.yandex-team.ru/" << serverName <<
        "/operations/" << GetGuidAsString(operationId);
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NDetail