diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-01-23 14:26:34 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-01-23 14:26:34 +0300 |
commit | ab6ff0609c96b8119ee8918ca0216a597febf97a (patch) | |
tree | 8fec75d4fb74d1430c3a24bdaf20dfe81eb20556 | |
parent | 0084aa2e47d04b92fd5a741fc7ed7495b02d9ada (diff) | |
download | ydb-ab6ff0609c96b8119ee8918ca0216a597febf97a.tar.gz |
Caching FQDN IP resolution for lib curl usage
Caching FQDN IP resolution for lib curl usage
12 files changed, 771 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/CMakeLists.darwin.txt b/ydb/library/yql/providers/common/http_gateway/CMakeLists.darwin.txt index b1fcb29d61..34c5592024 100644 --- a/ydb/library/yql/providers/common/http_gateway/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/common/http_gateway/CMakeLists.darwin.txt @@ -7,6 +7,7 @@ add_subdirectory(mock) +add_subdirectory(ut) add_library(providers-common-http_gateway) target_compile_options(providers-common-http_gateway PRIVATE @@ -21,6 +22,7 @@ target_link_libraries(providers-common-http_gateway PUBLIC library-cpp-retry providers-common-proto yql-public-issue + yql-utils-log ) target_sources(providers-common-http_gateway PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp diff --git a/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux-aarch64.txt index d9bc8dcff3..5f2df2978a 100644 --- a/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux-aarch64.txt @@ -7,6 +7,7 @@ add_subdirectory(mock) +add_subdirectory(ut) add_library(providers-common-http_gateway) target_compile_options(providers-common-http_gateway PRIVATE @@ -22,6 +23,7 @@ target_link_libraries(providers-common-http_gateway PUBLIC library-cpp-retry providers-common-proto yql-public-issue + yql-utils-log ) target_sources(providers-common-http_gateway PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp diff --git a/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux.txt b/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux.txt index d9bc8dcff3..5f2df2978a 100644 --- a/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/common/http_gateway/CMakeLists.linux.txt @@ -7,6 +7,7 @@ add_subdirectory(mock) +add_subdirectory(ut) add_library(providers-common-http_gateway) target_compile_options(providers-common-http_gateway PRIVATE @@ -22,6 +23,7 @@ target_link_libraries(providers-common-http_gateway PUBLIC library-cpp-retry providers-common-proto yql-public-issue + yql-utils-log ) target_sources(providers-common-http_gateway PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp diff --git a/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.darwin.txt b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.darwin.txt new file mode 100644 index 0000000000..ace0cf32d2 --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.darwin.txt @@ -0,0 +1,69 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-providers-common-http_gateway-ut) +target_compile_options(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway +) +target_link_libraries(ydb-library-yql-providers-common-http_gateway-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + providers-common-http_gateway + cpp-testing-unittest +) +target_link_options(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway_ut.cpp +) +set_property( + TARGET + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-library-yql-providers-common-http_gateway-ut + TEST_TARGET + ydb-library-yql-providers-common-http_gateway-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + PROCESSORS + 1 +) +vcs_info(ydb-library-yql-providers-common-http_gateway-ut) diff --git a/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..e1c6519589 --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,72 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-providers-common-http_gateway-ut) +target_compile_options(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway +) +target_link_libraries(ydb-library-yql-providers-common-http_gateway-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + providers-common-http_gateway + cpp-testing-unittest +) +target_link_options(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway_ut.cpp +) +set_property( + TARGET + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-library-yql-providers-common-http_gateway-ut + TEST_TARGET + ydb-library-yql-providers-common-http_gateway-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + PROCESSORS + 1 +) +vcs_info(ydb-library-yql-providers-common-http_gateway-ut) diff --git a/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.linux.txt b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.linux.txt new file mode 100644 index 0000000000..c5a0370485 --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.linux.txt @@ -0,0 +1,74 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-providers-common-http_gateway-ut) +target_compile_options(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway +) +target_link_libraries(ydb-library-yql-providers-common-http_gateway-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + providers-common-http_gateway + cpp-testing-unittest +) +target_link_options(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-library-yql-providers-common-http_gateway-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway_ut.cpp +) +set_property( + TARGET + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-library-yql-providers-common-http_gateway-ut + TEST_TARGET + ydb-library-yql-providers-common-http_gateway-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-library-yql-providers-common-http_gateway-ut + PROPERTY + PROCESSORS + 1 +) +vcs_info(ydb-library-yql-providers-common-http_gateway-ut) diff --git a/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.txt b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.txt new file mode 100644 index 0000000000..bede1861df --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/ut/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway.h new file mode 100644 index 0000000000..6da4bba88a --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway.h @@ -0,0 +1,295 @@ +#pragma once + +#include <contrib/libs/curl/include/curl/curl.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <util/generic/scope.h> +#include <util/network/address.h> +#include <util/network/socket.h> +#include <util/string/builder.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/log/log_component.h> + +#include <condition_variable> +#include <mutex> +#include <thread> +#include <unordered_map> + +namespace NYql { +namespace { + +struct TDNSResolver { + std::vector<NAddr::TOpaqueAddr> Resolve(const TString& host, ui16 port) const { + TNetworkAddress na(host, port); + std::vector<NAddr::TOpaqueAddr> result; + for (auto i = na.Begin(); i != na.End(); ++i) { + result.emplace_back(NAddr::TOpaqueAddr((*i).ai_addr)); + } + return result; + } +}; + +struct TResolutionTask { + TString Host; + ui16 Port = 0; + TString ExpectedIP; + TExplicitDNSRecord::ProtocolVersion ProtocolVersion = TExplicitDNSRecord_ProtocolVersion_ANY; +}; + +} // namespace + +template<typename Resolver = TDNSResolver> +class TDNSGateway { +public: + using TDNSCurlListPtr = std::shared_ptr<curl_slist>; + using TDNSConstCurlListPtr = const std::shared_ptr<const curl_slist>; + using TDNSResolutionTable = std::unordered_map<TString, std::vector<TString>>; + + static constexpr ui32 DEFAULT_UPDATE_INTERVAL = 60000; + + TDNSGateway( + const TDnsResolverConfig& dnsResolverConfig, + ::NMonitoring::TDynamicCounterPtr counters) + : IsStopped(false) + , UpdateInterval(std::chrono::milliseconds(dnsResolverConfig.GetRefreshMs())) + , TotalResolutionCounter( + counters->GetCounter("DNSGateway_TotalResolutions")) + , ResolutionSuccessCounter( + counters->GetCounter("DNSGateway_ResolutionSuccesses")) + , ResolutionErrorCounter( + counters->GetCounter("DNSGateway_ResolutionErrors")) + , ResolvedToNotExpectedIP( + counters->GetCounter("DNSGateway_ResolvedToNotExpectedIP")) { + if (dnsResolverConfig.HasRefreshMs()) { + UpdateInterval = std::chrono::milliseconds(dnsResolverConfig.GetRefreshMs()); + } + + for (auto& dnsResolverRecord: dnsResolverConfig.GetExplicitDNSRecord()) { + auto address = dnsResolverRecord.GetAddress(); + if (address) { + auto task = TResolutionTask{ + address, + static_cast<ui16>(dnsResolverRecord.GetPort()), + dnsResolverRecord.GetExpectedIP(), + dnsResolverRecord.GetProtocol()}; + + HostAddressedToResolve.emplace_back(task); + + if (task.ExpectedIP) { + auto hostname = TStringBuilder() + << task.Host << ":" << task.Port; + DnsResolutionTable.emplace( + std::move(hostname), + std::vector<TString>{task.ExpectedIP}); + } + } + } + + DnsCurlList = ConvertDNSTableToCurlList(); + YQL_CLOG(INFO, HttpGateway) + << "Filled DNS resolution table based on provided configuration"; + + UpdateResolutionTable(); + + Thread = std::thread([this]() { + auto lock = std::unique_lock{Sync}; + YQL_CLOG(DEBUG, HttpGateway) + << "DNS Gateway thread is going to start"; + while (true) { + if (IsStoppedConditionalVariable.wait_for( + lock, UpdateInterval, [this] { return IsStopped; })) { + YQL_CLOG(DEBUG, HttpGateway) + << "DNS Gateway thread is going to stop"; + break; + } + lock.unlock(); + Y_DEFER { + lock.lock(); + }; + + UpdateResolutionTable(); + } + YQL_CLOG(DEBUG, HttpGateway) << "DNS Gateway thread stopped"; + }); + } + + TDNSConstCurlListPtr GetDNSCurlList() { + auto lock = std::lock_guard{Sync}; + return DnsCurlList; + } + + ~TDNSGateway() { StopThread(); } + +private: + void SetDNSCurlList(TDNSCurlListPtr&& newDnsCurlList) { + auto lock = std::lock_guard{Sync}; + DnsCurlList = std::move(newDnsCurlList); + } + + void UpdateResolutionTable() { + YQL_CLOG(INFO, HttpGateway) << "Started DNS table update"; + + TDNSResolutionTable newResolutionTable; + for (const auto& record: HostAddressedToResolve) { + auto resolvedAddress = ResolveHostname( + record.Host, + record.Port, + record.ProtocolVersion); + + auto hostname = TStringBuilder() + << record.Host << ":" << record.Port; + if (!resolvedAddress.empty()) { + if (record.ExpectedIP && + std::find( + resolvedAddress.begin(), + resolvedAddress.end(), + record.ExpectedIP) == resolvedAddress.end()) { + ResolvedToNotExpectedIP->Inc(); + } + newResolutionTable.emplace( + std::move(hostname), std::move(resolvedAddress)); + } else { + if (DnsResolutionTable.contains(hostname)) { + newResolutionTable.emplace( + std::move(hostname), DnsResolutionTable.at(hostname)); + } + } + } + + if (newResolutionTable != DnsResolutionTable) { + YQL_CLOG(INFO, HttpGateway) + << "New resolution table contains changes compared current one. " + "Going to generate new Curl list"; + DnsResolutionTable = std::move(newResolutionTable); + SetDNSCurlList(ConvertDNSTableToCurlList()); + } else { + YQL_CLOG(DEBUG, HttpGateway) + << "New resolution table same as current " + "one. No update is required"; + } + } + + std::vector<TString> ResolveHostname( + const TString& host, + ui16 port, + TExplicitDNSRecord::ProtocolVersion protocolVersion) const { + + TotalResolutionCounter->Inc(); + std::vector<TString> result; + try { + const std::vector<NAddr::TOpaqueAddr> addresses = DnsResolver.Resolve(host, port); + for (auto& addr: addresses) { + switch (addr.Addr()->sa_family) { + case AF_INET: + if (protocolVersion == + TExplicitDNSRecord_ProtocolVersion_ANY || + protocolVersion == + TExplicitDNSRecord_ProtocolVersion_IPV4) { + result.emplace_back(NAddr::PrintHost(addr)); + } else { + YQL_CLOG(WARN, HttpGateway) + << "Discarding IPV4 address as other protocol version was " + "configured for hostname: " + << host << ":" << port; + } + break; + + case AF_INET6: + if (protocolVersion == + TExplicitDNSRecord_ProtocolVersion_ANY || + protocolVersion == + TExplicitDNSRecord_ProtocolVersion_IPV6) { + result.emplace_back(NAddr::PrintHost(addr)); + } else { + YQL_CLOG(WARN, HttpGateway) + << "Discarding IPV6 address as other protocol version was " + "configured for hostname: " + << host << ":" << port; + } + break; + } + } + } catch (const TNetworkResolutionError& e) { + YQL_CLOG(ERROR, HttpGateway) + << "An exception was raised during DNS " + "resolution for hostname: '" + << host << ":" << port << "' with error:" << e.AsStrBuf(); + ResolutionErrorCounter->Inc(); + return result; + } + + if (result.empty()) { + YQL_CLOG(WARN, HttpGateway) + << "No IPV4 or IPV6 address was recieved as a result of DNS " + "resolution for hostname: " + << host << ":" << port; + ResolutionErrorCounter->Inc(); + } else { + ResolutionSuccessCounter->Inc(); + } + + return result; + } + + void StopThread() { + { + auto lock = std::lock_guard{Sync}; + if (IsStopped) { + return; + } + IsStopped = true; + } + + IsStoppedConditionalVariable.notify_all(); + YQL_CLOG(DEBUG, HttpGateway) + << "Requested DNS gateway thread termination"; + if (Thread.joinable()) { + Thread.join(); + } + YQL_CLOG(DEBUG, HttpGateway) + << "DNS Gateway thread termination finished"; + } + + std::shared_ptr<curl_slist> ConvertDNSTableToCurlList() const { + curl_slist* dnsRecords = nullptr; + for (const auto& [hostname, addresses]: DnsResolutionTable) { + auto curlDnsRecord = TStringBuilder() << hostname << ":"; + + bool isFirst = true; + for (const auto& address: addresses) { + if (isFirst) { + isFirst = false; + } else { + curlDnsRecord << ","; + } + curlDnsRecord << address; + } + + YQL_CLOG(INFO, HttpGateway) + << "Adding new DNS entry: " << curlDnsRecord; + + dnsRecords = curl_slist_append(dnsRecords, curlDnsRecord.c_str()); + } + return std::shared_ptr<curl_slist>{dnsRecords, &curl_slist_free_all}; + } + +private: + std::mutex Sync; + std::thread Thread; + std::condition_variable IsStoppedConditionalVariable; + bool IsStopped = false; + + std::chrono::milliseconds UpdateInterval; + std::vector<TResolutionTask> HostAddressedToResolve; + + Resolver DnsResolver; + TDNSResolutionTable DnsResolutionTable; + TDNSCurlListPtr DnsCurlList; + + const ::NMonitoring::TDynamicCounters::TCounterPtr TotalResolutionCounter; + const ::NMonitoring::TDynamicCounters::TCounterPtr ResolutionSuccessCounter; + const ::NMonitoring::TDynamicCounters::TCounterPtr ResolutionErrorCounter; + const ::NMonitoring::TDynamicCounters::TCounterPtr ResolvedToNotExpectedIP; +}; + +} // namespace NYql diff --git a/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway_ut.cpp b/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway_ut.cpp new file mode 100644 index 0000000000..1a7bfd4928 --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/yql_dns_gateway_ut.cpp @@ -0,0 +1,192 @@ +#include "yql_dns_gateway.h" + +#include <util/network/address.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NYql { + +namespace { + +using TDnsResponse = std::vector<NAddr::TOpaqueAddr>; + +NAddr::TOpaqueAddr Ipv4Addr(const std::string& ip, ui16 port) { + NAddr::TIPv4Addr ipv4(TIpAddress(IpFromString(ip.c_str()), port)); + return NAddr::TOpaqueAddr(&ipv4); +} + +NAddr::TOpaqueAddr Ipv6Addr(const std::string& ip, ui16 port) { + sockaddr_in6 ipv6_sock = {}; + ipv6_sock.sin6_family = AF_INET6; + inet_pton(AF_INET6, ip.c_str(), &(ipv6_sock.sin6_addr)); + ipv6_sock.sin6_port = HostToInet(port); + NAddr::TIPv6Addr ipv6(ipv6_sock); + return NAddr::TOpaqueAddr(&ipv6); +} + +class TTestDNSTable { +public: + using TKey = std::pair<TString, ui16>; + + TTestDNSTable() = default; + + TTestDNSTable(std::initializer_list<std::pair<TKey, TDnsResponse>> l) + : DNSMapping(l.begin(), l.end()) { } + + std::vector<NAddr::TOpaqueAddr> Resolve(const TString& host, ui16 port) const { + auto key = std::make_pair(host, port); + + auto valueIt = DNSMapping.find(key); + if (valueIt == DNSMapping.end()) { + throw TNetworkResolutionError(10); + } + return valueIt->second; + } + +private: + std::unordered_map<TKey, TDnsResponse, NHashPrivate::TPairHash<TString, ui16>> DNSMapping; +}; + +class TTestDNSTableProxy { +public: + std::vector<NAddr::TOpaqueAddr> Resolve( + const TString& host, ui16 port) const { + return getInstance().Resolve(host, port); + } + + static const TTestDNSTable& getInstance() { return instance; } + + static void setInstance(const TTestDNSTable& table) { instance = table; } + +private: + static TTestDNSTable instance; +}; + +TTestDNSTable TTestDNSTableProxy::instance; +} // namespace + +Y_UNIT_TEST_SUITE(TDNSGatewaySuite) { + auto CreateDNSConfig(const TResolutionTask& input) { + auto config = std::make_unique<TDnsResolverConfig>(); + auto record = config->AddExplicitDNSRecord(); + record->SetAddress(input.Host); + record->SetPort(input.Port); + record->SetExpectedIP(input.ExpectedIP.c_str()); + record->SetProtocol(input.ProtocolVersion); + return config; + } + + void ValidateDNSCacheContent( + const curl_slist* actualDNSCacheContent, + const std::unordered_set<std::string>& expectedDNSCacheContent) { + + auto dnsItemsCount = 0; + auto it = actualDNSCacheContent; + while (it) { + dnsItemsCount++; + auto actualDNSCacheRecord = std::string(it->data); + UNIT_ASSERT(expectedDNSCacheContent.contains(actualDNSCacheRecord)); + it = it->next; + } + UNIT_ASSERT_VALUES_EQUAL(dnsItemsCount, expectedDNSCacheContent.size()); + } + + void RunTest( + const TTestDNSTable& dnsResolver, + const TResolutionTask& input, + const std::unordered_set<std::string>& expectedDNSCacheContent) { + // Setup + TTestDNSTableProxy::setInstance(std::move(dnsResolver)); + TIntrusivePtr<NMonitoring::TDynamicCounters> counters = + new NMonitoring::TDynamicCounters; + auto configPtr = CreateDNSConfig(input); + + // Execution + auto dnsGateway = + TDNSGateway<TTestDNSTableProxy>(*configPtr.get(), counters); + + // Validate results + if (expectedDNSCacheContent.empty()) { + UNIT_ASSERT(dnsGateway.GetDNSCurlList() == nullptr); + } else { + UNIT_ASSERT(dnsGateway.GetDNSCurlList() != nullptr); + ValidateDNSCacheContent( + dnsGateway.GetDNSCurlList().get(), + expectedDNSCacheContent); + } + // TearDown + TTestDNSTableProxy::setInstance(TTestDNSTable{}); + } + + Y_UNIT_TEST(ShouldResolveHostnameFromDNSDuringInitialization) { + auto dnsTable = TTestDNSTable{std::make_pair( + std::make_pair("localhost", 443), + std::vector<NAddr::TOpaqueAddr>{Ipv4Addr("127.0.0.1", 443)})}; + auto inputEntry = TResolutionTask{ + "localhost", + 443, + "127.0.0.2", + TExplicitDNSRecord_ProtocolVersion_ANY}; + auto expectedDNSCacheContent = + std::unordered_set<std::string>{"localhost:443:127.0.0.1"}; + RunTest(dnsTable, inputEntry, expectedDNSCacheContent); + } + + Y_UNIT_TEST(ShouldUsePreviouslyKnownResolutionIfDNSIsNotResponding) { + auto dnsTable = TTestDNSTable{{}}; + auto inputEntry = TResolutionTask{ + "localhost", + 443, + "127.0.0.2", + TExplicitDNSRecord_ProtocolVersion_ANY}; + auto expectedDNSCacheContent = + std::unordered_set<std::string>{"localhost:443:127.0.0.2"}; + RunTest(dnsTable, inputEntry, expectedDNSCacheContent); + } + + Y_UNIT_TEST(ShouldFilterDNSAddressedBasedOnProvidedProtocolIPV4Case) { + auto dnsTable = TTestDNSTable{std::make_pair( + std::make_pair("localhost", 443), + std::vector<NAddr::TOpaqueAddr>{ + Ipv4Addr("127.0.0.1", 443), Ipv6Addr("127.0.0.1", 443)})}; + auto inputEntry = TResolutionTask{ + "localhost", + 443, + "127.0.0.1", + TExplicitDNSRecord_ProtocolVersion_IPV4}; + auto expectedDNSCacheContent = + std::unordered_set<std::string>{"localhost:443:127.0.0.1"}; + RunTest(dnsTable, inputEntry, expectedDNSCacheContent); + } + + Y_UNIT_TEST(ShouldFilterDNSAddressedBasedOnProvidedProtocolIPV6Case) { + auto dnsTable = TTestDNSTable{std::make_pair( + std::make_pair("localhost", 443), + std::vector<NAddr::TOpaqueAddr>{ + Ipv4Addr("127.0.0.1", 443), Ipv6Addr("::1", 443)})}; + auto inputEntry = TResolutionTask{ + "localhost", + 443, + "127.0.0.1", + TExplicitDNSRecord_ProtocolVersion_IPV6}; + auto expectedDNSCacheContent = + std::unordered_set<std::string>{"localhost:443:::1"}; + RunTest(dnsTable, inputEntry, expectedDNSCacheContent); + } + + Y_UNIT_TEST(ShouldFilterDNSAddressedBasedOnProvidedProtocolANYCase) { + auto dnsTable = TTestDNSTable{std::make_pair( + std::make_pair("localhost", 443), + std::vector<NAddr::TOpaqueAddr>{ + Ipv4Addr("127.0.0.1", 443), Ipv6Addr("::1", 443)})}; + auto inputEntry = TResolutionTask{ + "localhost", + 443, + "127.0.0.1", + TExplicitDNSRecord_ProtocolVersion_ANY}; + auto expectedDNSCacheContent = + std::unordered_set<std::string>{"localhost:443:127.0.0.1,::1"}; + RunTest(dnsTable, inputEntry, expectedDNSCacheContent); + } + +} // Y_UNIT_TEST_SUITE(TDNSGatewaySuite) +} // namespace NYql diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 6054404ea4..5cf99777a1 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -1,5 +1,7 @@ #include "yql_http_gateway.h" +#include "yql_dns_gateway.h" +#include <ydb/library/yql/utils/log/log.h> #include <contrib/libs/curl/include/curl/curl.h> #include <util/stream/str.h> #include <util/string/builder.h> @@ -81,8 +83,8 @@ public: PUT }; - TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t sizeLimit = 0, size_t bodySize = 0, const TCurlInitConfig& config = TCurlInitConfig()) - : Headers(headers), Method(method), Offset(offset), SizeLimit(sizeLimit), BodySize(bodySize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), ErrorBuffer(static_cast<size_t>(CURL_ERROR_SIZE), '\0'), Url(url) + TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t sizeLimit = 0, size_t bodySize = 0, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) + : Headers(headers), Method(method), Offset(offset), SizeLimit(sizeLimit), BodySize(bodySize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), ErrorBuffer(static_cast<size_t>(CURL_ERROR_SIZE), '\0'), DnsCache(dnsCache), Url(url) { InitHandles(); Counter->Inc(); @@ -131,6 +133,10 @@ public: curl_easy_setopt(Handle, CURLOPT_LOW_SPEED_LIMIT, Config.LowSpeedLimit); curl_easy_setopt(Handle, CURLOPT_ERRORBUFFER, ErrorBuffer.data()); + if (DnsCache != nullptr) { + curl_easy_setopt(Handle, CURLOPT_RESOLVE, DnsCache.get()); + } + if (!Headers.empty()) { CurlHeaders = std::accumulate(Headers.cbegin(), Headers.cend(), CurlHeaders, std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2))); @@ -230,6 +236,7 @@ private: const ::NMonitoring::TDynamicCounters::TCounterPtr UploadedBytes; const TCurlInitConfig Config; std::vector<char> ErrorBuffer; + TDNSGateway<>::TDNSConstCurlListPtr DnsCache; public: TString Url; }; @@ -239,15 +246,15 @@ public: using TPtr = std::shared_ptr<TEasyCurlBuffer>; using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; - TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, sizeLimit, data.size(), std::move(config)), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState)) + TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, sizeLimit, data.size(), std::move(config), std::move(dnsCache)), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState)) { Output.Reserve(sizeLimit); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) { - return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config)); + static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) { + return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config), std::move(dnsCache)); } // return true if callback successfully added to this work @@ -341,8 +348,9 @@ public: IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, - const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, sizeLimit, 0ULL, std::move(config)) + const TCurlInitConfig& config = TCurlInitConfig(), + TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, sizeLimit, 0ULL, std::move(config), std::move(dnsCache)) , OnStart(std::move(onStart)) , OnNewData(std::move(onNewData)) , OnFinish(std::move(onFinish)) @@ -362,9 +370,10 @@ public: IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, - const TCurlInitConfig& config = TCurlInitConfig()) + const TCurlInitConfig& config = TCurlInitConfig(), + TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) { - return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config)); + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config), std::move(dnsCache)); } enum class EAction : i8 { @@ -464,7 +473,8 @@ public: explicit THTTPMultiGateway( const THttpGatewayConfig* httpGatewaysCfg, ::NMonitoring::TDynamicCounterPtr counters) - : Counters(std::move(counters)) + : DnsGateway(httpGatewaysCfg ? httpGatewaysCfg->GetDnsResolverConfig(): TDnsResolverConfig{}, counters) + , Counters(std::move(counters)) , Rps(Counters->GetCounter("Requests", true)) , InFlight(Counters->GetCounter("InFlight")) , InFlightStreams(Counters->GetCounter("InFlightStreams")) @@ -723,7 +733,7 @@ private: Rps->Inc(); const std::unique_lock lock(Sync); - auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList()); Await.emplace(std::move(easy)); Wakeup(0U); } @@ -749,7 +759,7 @@ private: if (easy->AddCallback(callback)) return; - auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList()); entry = easy; Await.emplace(std::move(easy)); Wakeup(sizeLimit); @@ -765,7 +775,7 @@ private: TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter); + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList()); const std::unique_lock lock(Sync); const auto handle = stream->GetHandle(); TEasyCurlStream::TWeakPtr weak = stream; @@ -821,6 +831,8 @@ private: static std::mutex CreateSync; static TWeakPtr Singleton; + TDNSGateway<> DnsGateway; + const ::NMonitoring::TDynamicCounterPtr Counters; const ::NMonitoring::TDynamicCounters::TCounterPtr Rps; const ::NMonitoring::TDynamicCounters::TCounterPtr InFlight; diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 65fb7ad5bf..a56120201e 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -53,6 +53,24 @@ enum ETokenType { /////////////////////////////// HTTP GATEWAY ////////////////////// +message TExplicitDNSRecord { + enum ProtocolVersion { + ANY = 0; + IPV4 = 1; + IPV6 = 2; + }; + + optional string Address = 1 [default = ""]; + optional uint32 Port = 2 [default = 443]; + optional string ExpectedIP = 3 [default = ""]; + optional ProtocolVersion Protocol = 4 [default = ANY]; +} + +message TDnsResolverConfig { + optional uint32 RefreshMs = 1 [default = 60000]; + repeated TExplicitDNSRecord ExplicitDNSRecord = 100; +} + message THttpGatewayConfig { optional uint32 MaxInFlightCount = 1; optional uint64 MaxSimulatenousDownloadsSize = 2; @@ -63,6 +81,7 @@ message THttpGatewayConfig { optional uint64 RequestTimeoutSeconds = 7; optional uint64 LowSpeedTimeSeconds = 8; optional uint64 LowSpeedBytesLimit = 9; + optional TDnsResolverConfig DnsResolverConfig = 10; } /////////////////////////////// YT /////////////////////////////// diff --git a/ydb/library/yql/utils/log/log_component.h b/ydb/library/yql/utils/log/log_component.h index d7a1e10ca1..4d82109216 100644 --- a/ydb/library/yql/utils/log/log_component.h +++ b/ydb/library/yql/utils/log/log_component.h @@ -32,6 +32,7 @@ enum class EComponent { ProviderPq, ProviderS3, CoreDq, + HttpGateway, // <--- put other log components here MaxValue }; @@ -73,6 +74,7 @@ struct EComponentHelpers { case EComponent::ProviderPq: return TStringBuf("PQ"); case EComponent::ProviderS3: return TStringBuf("S3"); case EComponent::CoreDq: return TStringBuf("core dq"); + case EComponent::HttpGateway: return TStringBuf("http gw"); default: ythrow yexception() << "invalid log component value: " << ToInt(component); @@ -103,6 +105,7 @@ struct EComponentHelpers { if (str == TStringBuf("PQ")) return EComponent::ProviderPq; if (str == TStringBuf("S3")) return EComponent::ProviderS3; if (str == TStringBuf("core dq")) return EComponent::CoreDq; + if (str == TStringBuf("http gw")) return EComponent::HttpGateway; ythrow yexception() << "unknown log component: '" << str << '\''; } |