aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/interface/config.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/interface/config.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/interface/config.cpp')
-rw-r--r--yt/cpp/mapreduce/interface/config.cpp321
1 files changed, 321 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/config.cpp b/yt/cpp/mapreduce/interface/config.cpp
new file mode 100644
index 0000000000..b474dc0844
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/config.cpp
@@ -0,0 +1,321 @@
+#include "config.h"
+
+#include "operation.h"
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/svnversion/svnversion.h>
+
+#include <library/cpp/yson/node/node_builder.h>
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/yson/json/yson2json_adapter.h>
+
+#include <util/string/strip.h>
+#include <util/folder/dirut.h>
+#include <util/folder/path.h>
+#include <util/stream/file.h>
+#include <util/generic/singleton.h>
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+#include <util/string/type.h>
+#include <util/system/hostname.h>
+#include <util/system/user.h>
+#include <util/system/env.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool TConfig::GetBool(const char* var, bool defaultValue)
+{
+ TString val = GetEnv(var, "");
+ if (val.empty()) {
+ return defaultValue;
+ }
+ return IsTrue(val);
+}
+
+int TConfig::GetInt(const char* var, int defaultValue)
+{
+ int result = 0;
+ TString val = GetEnv(var, "");
+ if (val.empty()) {
+ return defaultValue;
+ }
+ try {
+ result = FromString<int>(val);
+ } catch (const yexception& e) {
+ ythrow yexception() << "Cannot parse " << var << '=' << val << " as integer: " << e.what();
+ }
+ return result;
+}
+
+TDuration TConfig::GetDuration(const char* var, TDuration defaultValue)
+{
+ return TDuration::Seconds(GetInt(var, defaultValue.Seconds()));
+}
+
+EEncoding TConfig::GetEncoding(const char* var)
+{
+ const TString encodingName = GetEnv(var, "identity");
+ EEncoding encoding;
+ if (TryFromString(encodingName, encoding)) {
+ return encoding;
+ } else {
+ ythrow yexception() << var << ": encoding '" << encodingName << "' is not supported";
+ }
+}
+
+ EUploadDeduplicationMode TConfig::GetUploadingDeduplicationMode(
+ const char* var,
+ EUploadDeduplicationMode defaultValue)
+{
+ const TString deduplicationMode = GetEnv(var, TEnumTraits<EUploadDeduplicationMode>::ToString(defaultValue));
+ return TEnumTraits<EUploadDeduplicationMode>::FromString(deduplicationMode);
+}
+
+void TConfig::ValidateToken(const TString& token)
+{
+ for (size_t i = 0; i < token.size(); ++i) {
+ ui8 ch = token[i];
+ if (ch < 0x21 || ch > 0x7e) {
+ ythrow yexception() << "Incorrect token character '" << ch << "' at position " << i;
+ }
+ }
+}
+
+TString TConfig::LoadTokenFromFile(const TString& tokenPath)
+{
+ TFsPath path(tokenPath);
+ return path.IsFile() ? Strip(TIFStream(path).ReadAll()) : TString();
+}
+
+TNode TConfig::LoadJsonSpec(const TString& strSpec)
+{
+ TNode spec;
+ TStringInput input(strSpec);
+ TNodeBuilder builder(&spec);
+ TYson2JsonCallbacksAdapter callbacks(&builder);
+
+ Y_ENSURE(NJson::ReadJson(&input, &callbacks), "Cannot parse json spec: " << strSpec);
+ Y_ENSURE(spec.IsMap(), "Json spec is not a map");
+
+ return spec;
+}
+
+TRichYPath TConfig::LoadApiFilePathOptions(const TString& ysonMap)
+{
+ TNode attributes;
+ try {
+ attributes = NodeFromYsonString(ysonMap);
+ } catch (const yexception& exc) {
+ ythrow yexception() << "Failed to parse YT_API_FILE_PATH_OPTIONS (it must be yson map): " << exc;
+ }
+ TNode pathNode = "";
+ pathNode.Attributes() = attributes;
+ TRichYPath path;
+ Deserialize(path, pathNode);
+ return path;
+}
+
+void TConfig::LoadToken()
+{
+ if (auto envToken = GetEnv("YT_TOKEN")) {
+ Token = envToken;
+ } else if (auto envToken = GetEnv("YT_SECURE_VAULT_YT_TOKEN")) {
+ // If this code runs inside an vanilla peration in YT
+ // it should not use regular environment variable `YT_TOKEN`
+ // because it would be visible in UI.
+ // Token should be passed via `secure_vault` parameter in operation spec.
+ Token = envToken;
+ } else if (auto tokenPath = GetEnv("YT_TOKEN_PATH")) {
+ Token = LoadTokenFromFile(tokenPath);
+ } else {
+ Token = LoadTokenFromFile(GetHomeDir() + "/.yt/token");
+ }
+ ValidateToken(Token);
+}
+
+void TConfig::LoadSpec()
+{
+ TString strSpec = GetEnv("YT_SPEC", "{}");
+ Spec = LoadJsonSpec(strSpec);
+
+ strSpec = GetEnv("YT_TABLE_WRITER", "{}");
+ TableWriter = LoadJsonSpec(strSpec);
+}
+
+void TConfig::LoadTimings()
+{
+ ConnectTimeout = GetDuration("YT_CONNECT_TIMEOUT",
+ TDuration::Seconds(10));
+
+ SocketTimeout = GetDuration("YT_SOCKET_TIMEOUT",
+ GetDuration("YT_SEND_RECEIVE_TIMEOUT", // common
+ TDuration::Seconds(60)));
+
+ AddressCacheExpirationTimeout = TDuration::Minutes(15);
+
+ CacheLockTimeoutPerGb = TDuration::MilliSeconds(1000.0 * 1_GB * 8 / 20_MB); // 20 Mbps = 20 MBps / 8.
+
+ TxTimeout = GetDuration("YT_TX_TIMEOUT",
+ TDuration::Seconds(120));
+
+ PingTimeout = GetDuration("YT_PING_TIMEOUT",
+ TDuration::Seconds(5));
+
+ PingInterval = GetDuration("YT_PING_INTERVAL",
+ TDuration::Seconds(5));
+
+ WaitLockPollInterval = TDuration::Seconds(5);
+
+ RetryInterval = GetDuration("YT_RETRY_INTERVAL",
+ TDuration::Seconds(3));
+
+ ChunkErrorsRetryInterval = GetDuration("YT_CHUNK_ERRORS_RETRY_INTERVAL",
+ TDuration::Seconds(60));
+
+ RateLimitExceededRetryInterval = GetDuration("YT_RATE_LIMIT_EXCEEDED_RETRY_INTERVAL",
+ TDuration::Seconds(60));
+
+ StartOperationRetryInterval = GetDuration("YT_START_OPERATION_RETRY_INTERVAL",
+ TDuration::Seconds(60));
+
+ HostListUpdateInterval = TDuration::Seconds(60);
+}
+
+void TConfig::Reset()
+{
+ Hosts = GetEnv("YT_HOSTS", "hosts");
+ Pool = GetEnv("YT_POOL");
+ Prefix = GetEnv("YT_PREFIX");
+ ApiVersion = GetEnv("YT_VERSION", "v3");
+ LogLevel = GetEnv("YT_LOG_LEVEL", "error");
+
+ ContentEncoding = GetEncoding("YT_CONTENT_ENCODING");
+ AcceptEncoding = GetEncoding("YT_ACCEPT_ENCODING");
+
+ GlobalTxId = GetEnv("YT_TRANSACTION", "");
+
+ UseAsyncTxPinger = false;
+ AsyncHttpClientThreads = 1;
+ AsyncTxPingerPoolThreads = 1;
+
+ ForceIpV4 = GetBool("YT_FORCE_IPV4");
+ ForceIpV6 = GetBool("YT_FORCE_IPV6");
+ UseHosts = GetBool("YT_USE_HOSTS", true);
+
+ LoadToken();
+ LoadSpec();
+ LoadTimings();
+
+ CacheUploadDeduplicationMode = GetUploadingDeduplicationMode("YT_UPLOAD_DEDUPLICATION", EUploadDeduplicationMode::Host);
+
+ RetryCount = Max(GetInt("YT_RETRY_COUNT", 10), 1);
+ ReadRetryCount = Max(GetInt("YT_READ_RETRY_COUNT", 30), 1);
+ StartOperationRetryCount = Max(GetInt("YT_START_OPERATION_RETRY_COUNT", 30), 1);
+
+ RemoteTempFilesDirectory = GetEnv("YT_FILE_STORAGE",
+ "//tmp/yt_wrapper/file_storage");
+ RemoteTempTablesDirectory = GetEnv("YT_TEMP_TABLES_STORAGE",
+ "//tmp/yt_wrapper/table_storage");
+ RemoteTempTablesDirectory = GetEnv("YT_TEMP_DIR",
+ RemoteTempTablesDirectory);
+
+ InferTableSchema = false;
+
+ UseClientProtobuf = GetBool("YT_USE_CLIENT_PROTOBUF", false);
+ NodeReaderFormat = ENodeReaderFormat::Auto;
+ ProtobufFormatWithDescriptors = true;
+
+ MountSandboxInTmpfs = GetBool("YT_MOUNT_SANDBOX_IN_TMPFS");
+
+ ApiFilePathOptions = LoadApiFilePathOptions(GetEnv("YT_API_FILE_PATH_OPTIONS", "{}"));
+
+ ConnectionPoolSize = GetInt("YT_CONNECTION_POOL_SIZE", 16);
+
+ TraceHttpRequestsMode = FromString<ETraceHttpRequestsMode>(to_lower(GetEnv("YT_TRACE_HTTP_REQUESTS", "never")));
+
+ CommandsWithFraming = {
+ "read_table",
+ "get_table_columnar_statistics",
+ "get_job_input",
+ "concatenate",
+ "partition_tables",
+ };
+}
+
+TConfig::TConfig()
+{
+ Reset();
+}
+
+TConfigPtr TConfig::Get()
+{
+ struct TConfigHolder
+ {
+ TConfigHolder()
+ : Config(::MakeIntrusive<TConfig>())
+ { }
+
+ TConfigPtr Config;
+ };
+
+ return Singleton<TConfigHolder>()->Config;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TProcessState::TProcessState()
+{
+ try {
+ FqdnHostName = ::FQDNHostName();
+ } catch (const yexception& e) {
+ try {
+ FqdnHostName = ::HostName();
+ } catch (const yexception& e) {
+ ythrow yexception() << "Cannot get fqdn and host name: " << e.what();
+ }
+ }
+
+ try {
+ UserName = ::GetUsername();
+ } catch (const yexception& e) {
+ ythrow yexception() << "Cannot get user name: " << e.what();
+ }
+
+ Pid = static_cast<int>(getpid());
+
+ if (!ClientVersion) {
+ ClientVersion = ::TStringBuilder() << "YT C++ native " << GetProgramCommitId();
+ }
+}
+
+static TString CensorString(TString input)
+{
+ static const TString prefix = "AQAD-";
+ if (input.find(prefix) == TString::npos) {
+ return input;
+ } else {
+ return TString(input.size(), '*');
+ }
+}
+
+void TProcessState::SetCommandLine(int argc, const char* argv[])
+{
+ for (int i = 0; i < argc; ++i) {
+ CommandLine.push_back(argv[i]);
+ CensoredCommandLine.push_back(CensorString(CommandLine.back()));
+ }
+}
+
+TProcessState* TProcessState::Get()
+{
+ return Singleton<TProcessState>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT