1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
#include "operation_helpers.h"
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <util/string/builder.h>
#include <util/system/mutex.h>
#include <util/system/rwlock.h>
namespace NYT::NDetail {
////////////////////////////////////////////////////////////////////////////////
ui64 RoundUpFileSize(ui64 size)
{
constexpr ui64 roundUpTo = 4ull << 10;
return (size + roundUpTo - 1) & ~(roundUpTo - 1);
}
bool UseLocalModeOptimization(
const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy)
{
if (!context.Config->EnableLocalModeOptimization) {
return false;
}
static THashMap<TString, bool> localModeMap;
static TRWMutex mutex;
{
TReadGuard guard(mutex);
auto it = localModeMap.find(context.ServerName);
if (it != localModeMap.end()) {
return it->second;
}
}
bool isLocalMode = false;
TString localModeAttr("//sys/@local_mode_fqdn");
// We don't want to pollute logs with errors about failed request,
// so we check if path exists before getting it.
auto exists = RequestWithRetry<bool>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
[&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
return rawClient->Exists(
TTransactionId(),
localModeAttr,
TExistsOptions().ReadFrom(EMasterReadKind::Cache));
});
if (exists)
{
auto fqdnNode = RequestWithRetry<TNode>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
[&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
return rawClient->TryGet(
TTransactionId(),
localModeAttr,
TGetOptions().ReadFrom(EMasterReadKind::Cache));
});
if (!fqdnNode.IsUndefined()) {
auto fqdn = fqdnNode.AsString();
isLocalMode = (fqdn == TProcessState::Get()->FqdnHostName);
YT_LOG_DEBUG("Checking local mode; LocalModeFqdn: %v FqdnHostName: %v IsLocalMode: %v",
fqdn,
TProcessState::Get()->FqdnHostName,
isLocalMode ? "true" : "false");
}
}
{
TWriteGuard guard(mutex);
localModeMap[context.ServerName] = isLocalMode;
}
return isLocalMode;
}
TString GetOperationWebInterfaceUrl(TStringBuf serverName, TOperationId operationId)
{
serverName.ChopSuffix(":80");
serverName.ChopSuffix(".yt.yandex-team.ru");
serverName.ChopSuffix(".yt.yandex.net");
return ::TStringBuilder() << "https://yt.yandex-team.ru/" << serverName <<
"/operations/" << GetGuidAsString(operationId);
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
|