diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/http/host_manager.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/http/host_manager.cpp')
-rw-r--r-- | yt/cpp/mapreduce/http/host_manager.cpp | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http/host_manager.cpp b/yt/cpp/mapreduce/http/host_manager.cpp new file mode 100644 index 0000000000..a239dde769 --- /dev/null +++ b/yt/cpp/mapreduce/http/host_manager.cpp @@ -0,0 +1,140 @@ +#include "host_manager.h" + +#include "context.h" +#include "helpers.h" +#include "http.h" +#include "http_client.h" +#include "requests.h" + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <library/cpp/json/json_reader.h> + +#include <util/generic/guid.h> +#include <util/generic/vector.h> +#include <util/generic/singleton.h> +#include <util/generic/ymath.h> + +#include <util/random/random.h> + +#include <util/string/vector.h> + +namespace NYT::NPrivate { + +//////////////////////////////////////////////////////////////////////////////// + +static TVector<TString> ParseJsonStringArray(const TString& response) +{ + NJson::TJsonValue value; + TStringInput input(response); + NJson::ReadJsonTree(&input, &value); + + const NJson::TJsonValue::TArray& array = value.GetArray(); + TVector<TString> result; + result.reserve(array.size()); + for (size_t i = 0; i < array.size(); ++i) { + result.push_back(array[i].GetString()); + } + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +class THostManager::TClusterHostList +{ +public: + explicit TClusterHostList(TVector<TString> hosts) + : Hosts_(std::move(hosts)) + , Timestamp_(TInstant::Now()) + { } + + explicit TClusterHostList(std::exception_ptr error) + : Error_(std::move(error)) + , Timestamp_(TInstant::Now()) + { } + + TString ChooseHostOrThrow() const + { + if (Error_) { + std::rethrow_exception(Error_); + } + + if (Hosts_.empty()) { + ythrow yexception() << "fetched list of proxies is empty"; + } + + return Hosts_[RandomNumber<size_t>(Hosts_.size())]; + } + + TDuration GetAge() const + { + return TInstant::Now() - Timestamp_; + } + +private: + TVector<TString> Hosts_; + std::exception_ptr Error_; + TInstant Timestamp_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +THostManager& THostManager::Get() +{ + return *Singleton<THostManager>(); +} + +void THostManager::Reset() +{ + auto guard = Guard(Lock_); + ClusterHosts_.clear(); +} + +TString THostManager::GetProxyForHeavyRequest(const TClientContext& context) +{ + auto cluster = context.ServerName; + { + auto guard = Guard(Lock_); + auto it = ClusterHosts_.find(cluster); + if (it != ClusterHosts_.end() && it->second.GetAge() < context.Config->HostListUpdateInterval) { + return it->second.ChooseHostOrThrow(); + } + } + + auto hostList = GetHosts(context); + auto result = hostList.ChooseHostOrThrow(); + { + auto guard = Guard(Lock_); + ClusterHosts_.emplace(cluster, std::move(hostList)); + } + return result; +} + +THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& context) +{ + TString hostsEndpoint = context.Config->Hosts; + while (hostsEndpoint.StartsWith("/")) { + hostsEndpoint = hostsEndpoint.substr(1); + } + THttpHeader header("GET", hostsEndpoint, false); + + try { + auto hostName = context.ServerName; + auto requestId = CreateGuidAsString(); + // TODO: we need to set socket timeout here + auto response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); + auto hosts = ParseJsonStringArray(response->GetResponse()); + for (auto& host : hosts) { + host = CreateHostNameWithPort(host, context); + } + return TClusterHostList(std::move(hosts)); + } catch (const std::exception& e) { + return TClusterHostList(std::current_exception()); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NPrivate |