1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
|
#pragma once
#include "fwd.h"
#include "common.h"
#include <library/cpp/yt/misc/enum.h>
#include <library/cpp/yson/node/node.h>
#include <util/generic/maybe.h>
#include <util/generic/string.h>
#include <util/generic/hash_set.h>
#include <util/datetime/base.h>
namespace NYT {
enum EEncoding : int
{
E_IDENTITY /* "identity" */,
E_GZIP /* "gzip" */,
E_BROTLI /* "br" */,
E_Z_LZ4 /* "z-lz4" */,
};
enum class ENodeReaderFormat : int
{
Yson, // Always use YSON format,
Skiff, // Always use Skiff format, throw exception if it's not possible (non-strict schema, dynamic table etc.)
Auto, // Use Skiff format if it's possible, YSON otherwise
};
enum class ETraceHttpRequestsMode
{
// Never dump http requests.
Never /* "never" */,
// Dump failed http requests.
Error /* "error" */,
// Dump all http requests.
Always /* "always" */,
};
DEFINE_ENUM(EUploadDeduplicationMode,
// For each file only one process' thread from all possible hosts can upload it to the file cache at the same time.
// The others will wait for the uploading to finish and use already cached file.
((Global) (0))
// For each file and each particular host only one process' thread can upload it to the file cache at the same time.
// The others will wait for the uploading to finish and use already cached file.
((Host) (1))
// All processes' threads will upload a file to the cache concurrently.
((Disabled) (2))
);
////////////////////////////////////////////////////////////////////////////////
/// Enum describing possible versions of table writer implemetation.
enum class ETableWriterVersion
{
/// Allow library to choose version of writer.
Auto,
/// Stable but slower version of writer.
V1,
/// Unstable but faster version of writer (going to be default in the future).
V2,
};
////////////////////////////////////////////////////////////////////////////////
struct TConfig
: public TThrRefBase
{
TString Hosts;
TString Pool;
TString Token;
TString Prefix;
TString ApiVersion;
TString LogLevel;
TString LogPath;
///
/// For historical reasons mapreduce client uses its own logging system.
///
/// If this options is set to true library switches to yt/yt/core logging by default.
/// But if user calls @ref NYT::SetLogger library switches back to logger provided by user
/// (except for messages from yt/yt/core).
///
/// This is temporary option. In future it would be true by default, and then removed.
///
/// https://st.yandex-team.ru/YT-23645
bool LogUseCore = false;
// Compression for data that is sent to YT cluster.
EEncoding ContentEncoding;
// Compression for data that is read from YT cluster.
EEncoding AcceptEncoding;
TString GlobalTxId;
bool ForceIpV4;
bool ForceIpV6;
bool UseHosts;
TDuration HostListUpdateInterval;
TNode Spec;
TNode TableWriter;
TDuration ConnectTimeout;
TDuration SocketTimeout;
TDuration AddressCacheExpirationTimeout;
TDuration TxTimeout;
TDuration PingTimeout;
TDuration PingInterval;
int AsyncHttpClientThreads;
int AsyncTxPingerPoolThreads;
// How often should we poll for lock state
TDuration WaitLockPollInterval;
TDuration RetryInterval;
TDuration ChunkErrorsRetryInterval;
TDuration RateLimitExceededRetryInterval;
TDuration StartOperationRetryInterval;
int RetryCount;
int ReadRetryCount;
int StartOperationRetryCount;
/// @brief Period for checking status of running operation.
TDuration OperationTrackerPollPeriod = TDuration::Seconds(5);
TString RemoteTempFilesDirectory;
TString RemoteTempTablesDirectory;
// @brief Keep temp tables produced by TTempTable (despite their name). Should not be used in user programs,
// but may be useful for setting via environment variable for debugging purposes.
bool KeepTempTables = false;
//
// Infer schemas for nonexstent tables from typed rows (e.g. protobuf)
// when writing from operation or client writer.
// This options can be overridden in TOperationOptions and TTableWriterOptions.
bool InferTableSchema;
bool UseClientProtobuf;
ENodeReaderFormat NodeReaderFormat;
bool ProtobufFormatWithDescriptors;
int ConnectionPoolSize;
/// Defines replication factor that is used for files that are uploaded to YT
/// to use them in operations.
int FileCacheReplicationFactor = 10;
/// @brief Used when waiting for other process which uploads the same file to the file cache.
///
/// If CacheUploadDeduplicationMode is not Disabled, current process can wait for some other
/// process which is uploading the same file. This value is proportional to the timeout of waiting,
/// actual timeout computes as follows: fileSizeGb * CacheLockTimeoutPerGb.
/// Default timeout assumes that host has uploading speed equal to 20 Mb/s.
/// If timeout was reached, the file will be uploaded by current process without any other waits.
TDuration CacheLockTimeoutPerGb;
/// @brief Used to prevent concurrent uploading of the same file to the file cache.
/// NB: Each mode affects only users with the same mode enabled.
EUploadDeduplicationMode CacheUploadDeduplicationMode;
// @brief Minimum byte size for files to undergo deduplication at upload
i64 CacheUploadDeduplicationThreshold;
bool MountSandboxInTmpfs;
/// @brief Set upload options (e.g.) for files created by library.
///
/// Path itself is always ignored but path options (e.g. `BypassArtifactCache`) are used when uploading system files:
/// cppbinary, job state, etc
TRichYPath ApiFilePathOptions;
// Testing options, should never be used in user programs.
bool UseAbortableResponse = false;
bool EnableDebugMetrics = false;
//
// There is optimization used with local YT that enables to skip binary upload and use real binary path.
// When EnableLocalModeOptimization is set to false this optimization is completely disabled.
bool EnableLocalModeOptimization = true;
//
// If you want see stderr even if you jobs not failed set this true.
bool WriteStderrSuccessfulJobs = false;
//
// This configuration is useful for debug.
// If set to ETraceHttpRequestsMode::Error library will dump all http error requests.
// If set to ETraceHttpRequestsMode::All library will dump all http requests.
// All tracing occurres as DEBUG level logging.
ETraceHttpRequestsMode TraceHttpRequestsMode = ETraceHttpRequestsMode::Never;
TString SkynetApiHost;
// Sets SO_PRIORITY option on the socket
TMaybe<int> SocketPriority;
// Framing settings
// (cf. https://ytsaurus.tech/docs/en/user-guide/proxy/http-reference#framing).
THashSet<TString> CommandsWithFraming;
/// Which implemetation of table writer to use.
ETableWriterVersion TableWriterVersion = ETableWriterVersion::Auto;
/// Redirects stdout to stderr for jobs.
bool RedirectStdoutToStderr = false;
static bool GetBool(const char* var, bool defaultValue = false);
static int GetInt(const char* var, int defaultValue);
static TDuration GetDuration(const char* var, TDuration defaultValue);
static EEncoding GetEncoding(const char* var);
static EUploadDeduplicationMode GetUploadingDeduplicationMode(
const char* var,
EUploadDeduplicationMode defaultValue);
static void ValidateToken(const TString& token);
static TString LoadTokenFromFile(const TString& tokenPath);
static TNode LoadJsonSpec(const TString& strSpec);
static TRichYPath LoadApiFilePathOptions(const TString& ysonMap);
void LoadToken();
void LoadSpec();
void LoadTimings();
void Reset();
TConfig();
static TConfigPtr Get();
};
////////////////////////////////////////////////////////////////////////////////
struct TProcessState
{
TString FqdnHostName;
TString UserName;
int Pid;
TString ClientVersion;
TString BinaryPath;
TString BinaryName;
TProcessState();
static TProcessState* Get();
};
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
|