aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/http/client/fetch
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/http/client/fetch
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/cpp/http/client/fetch')
-rw-r--r--library/cpp/http/client/fetch/coctx.cpp1
-rw-r--r--library/cpp/http/client/fetch/coctx.h50
-rw-r--r--library/cpp/http/client/fetch/codes.h36
-rw-r--r--library/cpp/http/client/fetch/cosocket.h97
-rw-r--r--library/cpp/http/client/fetch/fetch_request.cpp114
-rw-r--r--library/cpp/http/client/fetch/fetch_request.h65
-rw-r--r--library/cpp/http/client/fetch/fetch_result.cpp32
-rw-r--r--library/cpp/http/client/fetch/fetch_result.h40
-rw-r--r--library/cpp/http/client/fetch/fetch_single.cpp205
-rw-r--r--library/cpp/http/client/fetch/fetch_single.h88
-rw-r--r--library/cpp/http/client/fetch/parse.cpp160
-rw-r--r--library/cpp/http/client/fetch/parse.h14
-rw-r--r--library/cpp/http/client/fetch/pool.cpp57
-rw-r--r--library/cpp/http/client/fetch/pool.h41
14 files changed, 1000 insertions, 0 deletions
diff --git a/library/cpp/http/client/fetch/coctx.cpp b/library/cpp/http/client/fetch/coctx.cpp
new file mode 100644
index 0000000000..dfbb6943a3
--- /dev/null
+++ b/library/cpp/http/client/fetch/coctx.cpp
@@ -0,0 +1 @@
+#include "coctx.h"
diff --git a/library/cpp/http/client/fetch/coctx.h b/library/cpp/http/client/fetch/coctx.h
new file mode 100644
index 0000000000..bd1b61cb59
--- /dev/null
+++ b/library/cpp/http/client/fetch/coctx.h
@@ -0,0 +1,50 @@
+#pragma once
+
+#include <library/cpp/coroutine/engine/impl.h>
+
+#include <util/thread/singleton.h>
+
+namespace NAsyncDns {
+ class TContResolver;
+ class TContDnsCache;
+}
+
+namespace NHttpFetcher {
+ struct TCoCtx {
+ TContExecutor* Executor;
+ NAsyncDns::TContResolver* Resolver;
+ NAsyncDns::TContDnsCache* DnsCache;
+
+ TCoCtx(TContExecutor* executor, NAsyncDns::TContResolver* resolver, NAsyncDns::TContDnsCache* dnsCache = nullptr)
+ : Executor(executor)
+ , Resolver(resolver)
+ , DnsCache(dnsCache)
+ {
+ }
+
+ TCont* Cont() {
+ return Executor->Running();
+ }
+ };
+
+ inline TCoCtx*& CoCtx() {
+ return *FastTlsSingletonWithPriority<TCoCtx*, 0>();
+ }
+
+ class TCoCtxSetter {
+ public:
+ TCoCtxSetter(TContExecutor* executor, NAsyncDns::TContResolver* resolver, NAsyncDns::TContDnsCache* dnsCache = nullptr)
+ : Instance(executor, resolver, dnsCache)
+ {
+ Y_VERIFY(!CoCtx(), "coCtx already exists");
+ CoCtx() = &Instance;
+ }
+
+ ~TCoCtxSetter() {
+ CoCtx() = nullptr;
+ }
+
+ private:
+ TCoCtx Instance;
+ };
+}
diff --git a/library/cpp/http/client/fetch/codes.h b/library/cpp/http/client/fetch/codes.h
new file mode 100644
index 0000000000..25d09c88f8
--- /dev/null
+++ b/library/cpp/http/client/fetch/codes.h
@@ -0,0 +1,36 @@
+#pragma once
+
+namespace NHttpFetcher {
+ const int FETCH_SUCCESS_CODE = 200;
+ const int SERVICE_UNAVAILABLE = 503;
+ const int ZORA_TIMEOUT_CODE = 5000;
+ const int URL_FILTER_CODE = 6000;
+ const int WRONG_HTTP_HEADER_CODE = 6001;
+ const int FETCH_LARGE_FILE = 6002;
+ const int FETCH_CANNOT_PARSE = 6003;
+ const int HOSTS_QUEUE_TIMEOUT = 6004;
+ const int WRONG_HTTP_RESPONSE = 6005;
+ const int UNKNOWN_ERROR = 6006;
+ const int FETCHER_QUEUE_TIMEOUT = 6007;
+ const int FETCH_IGNORE = 6008;
+ const int FETCH_CANCELLED = 6009;
+
+ inline bool IsRedirectCode(int code) {
+ return 301 == code || 302 == code || 303 == code ||
+ 305 == code || 307 == code || 308 == code;
+ }
+
+ inline bool IsSuccessCode(int code) {
+ return code >= 200 && code < 300;
+ }
+
+ inline bool NoRefetch(int code) {
+ return code == 415 || // Unsupported media type
+ code == 601 || // Large file
+ (code >= 400 && code < 500) ||
+ code == 1003 || // disallowed by robots.txt
+ code == 1006 || // not found by dns server
+ code == 6008; // once ignored, always ignored
+ }
+
+}
diff --git a/library/cpp/http/client/fetch/cosocket.h b/library/cpp/http/client/fetch/cosocket.h
new file mode 100644
index 0000000000..8230d36bbe
--- /dev/null
+++ b/library/cpp/http/client/fetch/cosocket.h
@@ -0,0 +1,97 @@
+#pragma once
+
+#include "coctx.h"
+
+#include <library/cpp/coroutine/engine/network.h>
+#include <library/cpp/http/fetch_gpl/sockhandler.h>
+
+#include <util/system/error.h>
+
+namespace NHttpFetcher {
+ class TCoSocketHandler {
+ public:
+ TCoSocketHandler() = default;
+
+ ~TCoSocketHandler() {
+ Disconnect();
+ }
+
+ int Good() const {
+ return (Fd != INVALID_SOCKET);
+ }
+
+ int Connect(const TAddrList& addrs, TDuration timeout) {
+ TCont* cont = CoCtx()->Cont();
+ Timeout = timeout;
+ for (const auto& item : addrs) {
+ try {
+ const sockaddr* sa = item->Addr();
+ TSocketHolder s(NCoro::Socket(sa->sa_family, SOCK_STREAM, 0));
+ if (s.Closed()) {
+ continue;
+ }
+ int err = NCoro::ConnectT(cont, s, sa, item->Len(), Timeout);
+ if (err) {
+ s.Close();
+ errno = err;
+ continue;
+ }
+ SetZeroLinger(s);
+ SetKeepAlive(s, true);
+ Fd.Swap(s);
+ return 0;
+ } catch (const TSystemError&) {
+ }
+ }
+ return errno ? errno : EBADF;
+ }
+
+ void Disconnect() {
+ if (Fd.Closed())
+ return;
+ try {
+ ShutDown(Fd, SHUT_RDWR);
+ } catch (const TSystemError&) {
+ }
+ Fd.Close();
+ }
+
+ void shutdown() {
+ try {
+ ShutDown(Fd, SHUT_WR);
+ } catch (TSystemError&) {
+ }
+ }
+
+ ssize_t send(const void* message, size_t messlen) {
+ TCont* cont = CoCtx()->Cont();
+ TContIOStatus status = NCoro::WriteT(cont, Fd, message, messlen, Timeout);
+ errno = status.Status();
+ return status.Status() ? -1 : (ssize_t)status.Processed();
+ }
+
+ bool peek() {
+ TCont* cont = CoCtx()->Cont();
+ if ((errno = NCoro::PollT(cont, Fd, CONT_POLL_READ, Timeout)))
+ return false;
+ char buf[1];
+#ifdef _win32_
+ return (1 == ::recv(Fd, buf, 1, MSG_PEEK));
+#else
+ return (1 == ::recv(Fd, buf, 1, MSG_PEEK | MSG_DONTWAIT));
+#endif
+ }
+
+ ssize_t read(void* message, size_t messlen) {
+ TCont* cont = CoCtx()->Cont();
+ TContIOStatus status = NCoro::ReadT(cont, Fd, message, messlen, Timeout);
+ errno = status.Status();
+ return status.Status() ? -1 : (ssize_t)status.Processed();
+ }
+
+ protected:
+ TSocketHolder Fd;
+ TDuration Timeout;
+ static THolder<TIpAddress> AddrToBind;
+ };
+}
diff --git a/library/cpp/http/client/fetch/fetch_request.cpp b/library/cpp/http/client/fetch/fetch_request.cpp
new file mode 100644
index 0000000000..2f8453fc45
--- /dev/null
+++ b/library/cpp/http/client/fetch/fetch_request.cpp
@@ -0,0 +1,114 @@
+#include "fetch_request.h"
+
+#include <library/cpp/deprecated/atomic/atomic.h>
+
+// TRequest
+namespace NHttpFetcher {
+ const TString DEFAULT_ACCEPT_ENCODING = "gzip, deflate";
+ const size_t DEFAULT_MAX_HEADER_SIZE = 100 << 10;
+ const size_t DEFAULT_MAX_BODY_SIZE = 1 << 29;
+
+ TRequest::TRequest(const TString& url, TCallBack onFetch)
+ : Url(url)
+ , Deadline(TInstant::Now() + DEFAULT_REQUEST_TIMEOUT)
+ , Freshness(DEFAULT_REQUEST_FRESHNESS)
+ , Priority(40)
+ , IgnoreRobotsTxt(false)
+ , LangRegion(ELR_RU)
+ , OnFetch(onFetch)
+ , AcceptEncoding(DEFAULT_ACCEPT_ENCODING)
+ , OnlyHeaders(false)
+ , MaxHeaderSize(DEFAULT_MAX_HEADER_SIZE)
+ , MaxBodySize(DEFAULT_MAX_BODY_SIZE)
+ {
+ GenerateSequence();
+ }
+
+ TRequest::TRequest(const TString& url, bool ignoreRobotsTxt, TDuration timeout, TDuration freshness, TCallBack onFetch)
+ : Url(url)
+ , Deadline(Now() + timeout)
+ , Freshness(freshness)
+ , Priority(40)
+ , IgnoreRobotsTxt(ignoreRobotsTxt)
+ , LangRegion(ELR_RU)
+ , OnFetch(onFetch)
+ , AcceptEncoding(DEFAULT_ACCEPT_ENCODING)
+ , OnlyHeaders(false)
+ , MaxHeaderSize(DEFAULT_MAX_HEADER_SIZE)
+ , MaxBodySize(DEFAULT_MAX_BODY_SIZE)
+ {
+ GenerateSequence();
+ }
+
+ TRequest::TRequest(const TString& url, TDuration timeout, TDuration freshness, bool ignoreRobots,
+ size_t priority, const TMaybe<TString>& login, const TMaybe<TString>& password,
+ ELangRegion langRegion, TCallBack onFetch)
+ : Url(url)
+ , Deadline(Now() + timeout)
+ , Freshness(freshness)
+ , Priority(priority)
+ , Login(login)
+ , Password(password)
+ , IgnoreRobotsTxt(ignoreRobots)
+ , LangRegion(langRegion)
+ , OnFetch(onFetch)
+ , AcceptEncoding(DEFAULT_ACCEPT_ENCODING)
+ , OnlyHeaders(false)
+ , MaxHeaderSize(DEFAULT_MAX_HEADER_SIZE)
+ , MaxBodySize(DEFAULT_MAX_BODY_SIZE)
+ {
+ GenerateSequence();
+ }
+
+ void TRequest::GenerateSequence() {
+ static TAtomic nextSeq = 0;
+ Sequence = AtomicIncrement(nextSeq);
+ }
+
+ TRequestRef TRequest::Clone() {
+ THolder<TRequest> request = THolder<TRequest>(new TRequest(*this));
+ request->GenerateSequence();
+ return request.Release();
+ }
+
+ void TRequest::Dump(IOutputStream& out) {
+ out << "url: " << Url << "\n";
+ out << "timeout: " << (Deadline - Now()).MilliSeconds() << " ms\n";
+ out << "freshness: " << Freshness.Seconds() << "\n";
+ out << "priority: " << Priority << "\n";
+ if (!!Login) {
+ out << "login: " << *Login << "\n";
+ }
+ if (!!Password) {
+ out << "password: " << *Password << "\n";
+ }
+ if (!!OAuthToken) {
+ out << "oauth token: " << *OAuthToken << "\n";
+ }
+ if (IgnoreRobotsTxt) {
+ out << "ignore robots: " << IgnoreRobotsTxt << "\n";
+ }
+ out << "lang reg: " << LangRegion2Str(LangRegion) << "\n";
+ if (!!CustomHost) {
+ out << "custom host: " << *CustomHost << "\n";
+ }
+ if (!!UserAgent) {
+ out << "user agent: " << *UserAgent << "\n";
+ }
+ if (!!AcceptEncoding) {
+ out << "accept enc: " << *AcceptEncoding << "\n";
+ }
+ if (OnlyHeaders) {
+ out << "only headers: " << OnlyHeaders << "\n";
+ }
+ out << "max header sz: " << MaxHeaderSize << "\n";
+ out << "max body sz: " << MaxBodySize << "\n";
+ if (!!PostData) {
+ out << "post data: " << *PostData << "\n";
+ }
+ if (!!ContentType) {
+ out << "content type: " << *ContentType << "\n";
+ }
+ }
+
+}
diff --git a/library/cpp/http/client/fetch/fetch_request.h b/library/cpp/http/client/fetch/fetch_request.h
new file mode 100644
index 0000000000..169c2940d7
--- /dev/null
+++ b/library/cpp/http/client/fetch/fetch_request.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include "fetch_result.h"
+
+#include <kernel/langregion/langregion.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/ptr.h>
+
+namespace NHttpFetcher {
+ const TDuration DEFAULT_REQUEST_TIMEOUT = TDuration::Minutes(1);
+ const TDuration DEFAULT_REQUEST_FRESHNESS = TDuration::Seconds(10000);
+
+ class TRequest;
+ using TRequestRef = TIntrusivePtr<TRequest>;
+
+ class TRequest: public TAtomicRefCount<TRequest> {
+ private:
+ TRequest(const TRequest&) = default;
+ TRequest& operator=(const TRequest&) = default;
+ void GenerateSequence();
+
+ public:
+ TRequest(const TString& url = "", TCallBack onFetch = TCallBack());
+ TRequest(const TString& url, bool ignoreRobotsTxt, TDuration timeout,
+ TDuration freshness, TCallBack onFetch = TCallBack());
+ TRequest(const TString& url, TDuration timeout, TDuration freshness, bool ignoreRobots,
+ size_t priority, const TMaybe<TString>& login = TMaybe<TString>(),
+ const TMaybe<TString>& password = TMaybe<TString>(),
+ ELangRegion langRegion = ELR_RU, TCallBack onFetch = TCallBack());
+ void Dump(IOutputStream& out);
+ TRequestRef Clone();
+
+ public:
+ TString Url;
+ TMaybe<TString> UnixSocketPath;
+
+ TInstant Deadline; // [default = 1 min]
+ TDuration RdWrTimeout;
+ TMaybe<TDuration> ConnectTimeout;
+ TDuration Freshness; // [default = 1000 sec]
+ size_t Priority; // lower is more important; range [0, 100], default 40
+
+ TMaybe<TString> Login;
+ TMaybe<TString> Password;
+ TMaybe<TString> OAuthToken;
+ bool IgnoreRobotsTxt; // [default = false]
+ ELangRegion LangRegion; // [default = ELR_RU]
+ TCallBack OnFetch; // for async requests
+ ui64 Sequence; // unique id
+ TMaybe<TString> CustomHost; // Use custom host for "Host" header
+ TMaybe<TString> UserAgent; // custom user agen, [default = YandexNews]
+ TMaybe<TString> AcceptEncoding; // custom accept encoding, [default = "gzip, deflate"]
+ bool OnlyHeaders; // [default = false], if true - no content will be fetched (HEAD request)
+ size_t MaxHeaderSize; // returns 1002 error if exceeded
+ size_t MaxBodySize; // returns 1002 error if exceeded
+ TNeedDataCallback NeedDataCallback; // set this callback if you need to check data while fetching
+ // true - coninue fetching, false - stop
+ TMaybe<TString> Method; // for http exotics like "PUT ", "PATCH ", "DELETE ". if doesn't exist, GET or POST will br used
+ TMaybe<TString> PostData; // if exists - send post request
+ TMaybe<TString> ContentType; // custom content-type for post requests
+ // [default = "application/x-www-form-urlencoded"]
+ TVector<TString> ExtraHeaders; // needed for some servers (to auth, for ex.); don't forget to add "\r\n"!
+ };
+}
diff --git a/library/cpp/http/client/fetch/fetch_result.cpp b/library/cpp/http/client/fetch/fetch_result.cpp
new file mode 100644
index 0000000000..0ba1b1e6be
--- /dev/null
+++ b/library/cpp/http/client/fetch/fetch_result.cpp
@@ -0,0 +1,32 @@
+#include "codes.h"
+#include "fetch_result.h"
+
+#include <library/cpp/charset/recyr.hh>
+
+namespace NHttpFetcher {
+ TResult::TResult(const TString& url, int code)
+ : RequestUrl(url)
+ , ResolvedUrl(url)
+ , Code(code)
+ , ConnectionReused(false)
+ {
+ }
+
+ TString TResult::DecodeData(bool* decoded) const {
+ if (!!Encoding && *Encoding != CODES_UTF8) {
+ if (decoded) {
+ *decoded = true;
+ }
+ return Recode(*Encoding, CODES_UTF8, Data);
+ }
+ if (decoded) {
+ *decoded = false;
+ }
+ return Data;
+ }
+
+ bool TResult::Success() const {
+ return Code == FETCH_SUCCESS_CODE;
+ }
+
+}
diff --git a/library/cpp/http/client/fetch/fetch_result.h b/library/cpp/http/client/fetch/fetch_result.h
new file mode 100644
index 0000000000..24fe49e1f6
--- /dev/null
+++ b/library/cpp/http/client/fetch/fetch_result.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <library/cpp/charset/doccodes.h>
+#include <library/cpp/http/io/headers.h>
+#include <library/cpp/langs/langs.h>
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+
+#include <functional>
+
+namespace NHttpFetcher {
+ // Result
+ using TResultRef = TIntrusivePtr<struct TResult>;
+ struct TResult: public TAtomicRefCount<TResult> {
+ TResult(const TString& url, int code = 0);
+ TString DecodeData(bool* decoded = nullptr) const;
+ bool Success() const;
+
+ public:
+ TString RequestUrl;
+ TString ResolvedUrl;
+ TString Location;
+ int Code;
+ bool ConnectionReused;
+ TString StatusStr;
+ TString MimeType;
+ THttpHeaders Headers;
+ TString Data;
+ TMaybe<ECharset> Encoding;
+ TMaybe<ELanguage> Language;
+ TVector<TResultRef> Redirects;
+ TString HttpVersion;
+ };
+
+ using TCallBack = std::function<void(TResultRef)>;
+ using TNeedDataCallback = std::function<bool(const TString&)>;
+}
diff --git a/library/cpp/http/client/fetch/fetch_single.cpp b/library/cpp/http/client/fetch/fetch_single.cpp
new file mode 100644
index 0000000000..27d0888b80
--- /dev/null
+++ b/library/cpp/http/client/fetch/fetch_single.cpp
@@ -0,0 +1,205 @@
+#include "codes.h"
+#include "fetch_single.h"
+#include "parse.h"
+
+#include <library/cpp/string_utils/base64/base64.h>
+
+#include <util/string/ascii.h>
+#include <util/string/cast.h>
+#include <library/cpp/string_utils/url/url.h>
+
+namespace NHttpFetcher {
+ static sockaddr_in6 IPv6Loopback() {
+ sockaddr_in6 sock = {};
+ sock.sin6_family = AF_INET6;
+ sock.sin6_addr = IN6ADDR_LOOPBACK_INIT;
+ return sock;
+ }
+
+ class THeaders {
+ public:
+ inline void Build(const TRequestRef& request) {
+ if (!!request->Password || !!request->Login) {
+ TString pass;
+ TString login;
+ TString raw;
+ TString encoded;
+
+ if (!!request->Password) {
+ pass = *request->Password;
+ }
+ if (!!request->Login) {
+ login = *request->Login;
+ }
+ raw = TString::Join(login, ":", pass);
+ Base64Encode(raw, encoded);
+ BasicAuth_ = TString::Join("Authorization: Basic ", encoded, "\r\n");
+ Headers_.push_back(BasicAuth_.c_str());
+ }
+
+ if (request->ExtraHeaders) {
+ Headers_.reserve(Headers_.size() + request->ExtraHeaders.size());
+ for (const TString& header : request->ExtraHeaders) {
+ if (AsciiHasSuffixIgnoreCase(header, "\r\n")) {
+ Headers_.push_back(header.c_str());
+ } else {
+ Fixed_.push_back(header + "\r\n");
+ Headers_.push_back(Fixed_.back().c_str());
+ }
+ }
+ }
+
+ if (!!request->OAuthToken) {
+ OAuth_ = TString::Join("Authorization: OAuth ", *request->OAuthToken, "\r\n");
+ Headers_.push_back(OAuth_.c_str());
+ }
+
+ if (!!request->AcceptEncoding) {
+ AcceptEncoding_ = TString::Join("Accept-Encoding: ", *request->AcceptEncoding, "\r\n");
+ Headers_.push_back(AcceptEncoding_.c_str());
+ }
+
+ ContentType_ = "application/x-www-form-urlencoded";
+ if (!!request->PostData) {
+ if (!!request->ContentType) {
+ ContentType_ = *request->ContentType;
+ }
+ ContentLength_ = TString::Join("Content-Length: ", ToString(request->PostData->size()), "\r\n");
+ ContentType_ = TString::Join("Content-Type: ", ContentType_, "\r\n");
+ Headers_.push_back(ContentLength_.c_str());
+ Headers_.push_back(ContentType_.c_str());
+ }
+
+ Headers_.push_back((const char*)nullptr);
+ }
+
+ inline const char* const* Data() {
+ return Headers_.data();
+ }
+
+ private:
+ TVector<const char*> Headers_;
+ TVector<TString> Fixed_;
+ TString BasicAuth_;
+ TString OAuth_;
+ TString AcceptEncoding_;
+ TString ContentLength_;
+ TString ContentType_;
+ };
+
+ TResultRef FetchSingleImpl(TRequestRef request, TSocketPool* pool) {
+ Y_ASSERT(!!request->Url && "no url passed in fetch request");
+ TResultRef result(new TResult(request->Url));
+ try {
+ TSimpleFetcherFetcher fetcher;
+ THttpHeader header;
+
+ THttpURL::EKind kind = THttpURL::SchemeHTTP;
+ ui16 port = 80;
+ TString host;
+ ParseUrl(request->Url, kind, host, port);
+
+ TString path = ToString(GetPathAndQuery(request->Url));
+
+ bool defaultPort = (kind == THttpURL::SchemeHTTP && port == 80) ||
+ (kind == THttpURL::SchemeHTTPS && port == 443);
+
+ if (request->UnixSocketPath && !request->UnixSocketPath->empty()) {
+ TAddrList addrs;
+ addrs.emplace_back(new NAddr::TUnixSocketAddr{*request->UnixSocketPath});
+ fetcher.SetHost(host.data(), port, addrs, kind);
+ } else if (host == "127.0.0.1") {
+ // todo: correctly handle /etc/hosts records && ip-addresses
+
+ // bypass normal DNS resolving for localhost
+ TAddrList addrs({new NAddr::TIPv4Addr(TIpAddress(0x0100007F, port))});
+ fetcher.SetHost(host.data(), port, addrs, kind);
+ } else if (host == "localhost") {
+ sockaddr_in6 ipv6Addr = IPv6Loopback();
+ ipv6Addr.sin6_port = HostToInet(port);
+
+ TAddrList addrs({new NAddr::TIPv6Addr(ipv6Addr), new NAddr::TIPv4Addr(TIpAddress(0x0100007F, port))});
+ fetcher.SetHost(host.data(), port, addrs, kind);
+ } else {
+ Y_ASSERT(!!host && "no host detected in url passed");
+ fetcher.SetHost(host.data(), port, kind);
+ }
+ header.Init();
+
+ THeaders headers;
+ headers.Build(request);
+
+ TString hostHeader = (!!request->CustomHost ? *request->CustomHost : host) +
+ (defaultPort ? "" : ":" + ToString(port));
+ fetcher.SetHostHeader(hostHeader.data());
+
+ if (!!request->UserAgent) {
+ fetcher.SetIdentification((*request->UserAgent).data(), nullptr);
+ } else {
+ fetcher.SetIdentification("Mozilla/5.0 (compatible; YandexNews/3.0; +http://yandex.com/bots)", nullptr);
+ }
+
+ if (!!request->Method) {
+ fetcher.SetMethod(request->Method->data(), request->Method->size());
+ }
+
+ if (!!request->PostData) {
+ fetcher.SetPostData(request->PostData->data(), request->PostData->size());
+ }
+
+ fetcher.SetMaxBodySize(request->MaxBodySize);
+ fetcher.SetMaxHeaderSize(request->MaxHeaderSize);
+
+ if (request->ConnectTimeout) {
+ fetcher.SetConnectTimeout(*request->ConnectTimeout);
+ }
+
+ {
+ const TDuration rest = request->Deadline - Now();
+ fetcher.SetTimeout(request->RdWrTimeout != TDuration::Zero()
+ ? ::Min(request->RdWrTimeout, rest)
+ : rest);
+ }
+ fetcher.SetNeedDataCallback(request->NeedDataCallback);
+ bool persistent = !request->OnlyHeaders;
+ void* handle = nullptr;
+
+ if (pool) {
+ while (auto socket = pool->GetSocket(host, port)) {
+ if (socket->Good()) {
+ handle = socket.Get();
+
+ fetcher.SetSocket(socket.Release());
+ fetcher.SetPersistent(true);
+ break;
+ }
+ }
+ }
+
+ int fetchResult = fetcher.Fetch(&header, path.data(), headers.Data(), persistent, request->OnlyHeaders);
+
+ if (!fetcher.Data.empty()) {
+ TStringInput httpIn(fetcher.Data.Str());
+ ParseHttpResponse(*result, httpIn, kind, host, port);
+ }
+
+ if (fetchResult < 0 || header.error != 0) {
+ result->Code = header.error;
+ }
+
+ if (pool && persistent && !header.error && !header.connection_closed) {
+ THolder<TSocketPool::TSocketHandle> socket(fetcher.PickOutSocket());
+
+ if (!!socket && socket->Good()) {
+ if (handle == socket.Get()) {
+ result->ConnectionReused = true;
+ }
+ pool->ReturnSocket(host, port, std::move(socket));
+ }
+ }
+ } catch (...) {
+ result->Code = UNKNOWN_ERROR;
+ }
+ return result;
+ }
+}
diff --git a/library/cpp/http/client/fetch/fetch_single.h b/library/cpp/http/client/fetch/fetch_single.h
new file mode 100644
index 0000000000..890c925cc9
--- /dev/null
+++ b/library/cpp/http/client/fetch/fetch_single.h
@@ -0,0 +1,88 @@
+#pragma once
+
+#include "cosocket.h"
+#include "fetch_request.h"
+#include "fetch_result.h"
+#include "pool.h"
+
+#include <library/cpp/coroutine/dns/helpers.h>
+#include <library/cpp/http/client/ssl/sslsock.h>
+#include <library/cpp/http/fetch_gpl/httpagent.h>
+#include <library/cpp/http/fetch/httpfetcher.h>
+#include <library/cpp/http/fetch/httpheader.h>
+
+#include <util/generic/algorithm.h>
+
+namespace NHttpFetcher {
+ class TCoIpResolver {
+ public:
+ TAddrList Resolve(const char* host, TIpPort port) const {
+ NAsyncDns::TAddrs addrs;
+ try {
+ NAsyncDns::ResolveAddr(*CoCtx()->Resolver, host, port, addrs, CoCtx()->DnsCache);
+ } catch (...) {
+ return TAddrList();
+ }
+
+ // prefer IPv6
+ SortBy(addrs.begin(), addrs.end(), [](const auto& addr) {
+ return addr->Addr()->sa_family == AF_INET6 ? 0 : 1;
+ });
+
+ return TAddrList(addrs.begin(), addrs.end());
+ }
+ };
+
+ struct TStringSaver {
+ int Write(const void* buf, size_t len) {
+ Data.Write(buf, len);
+ return 0;
+ }
+ TStringStream Data;
+ };
+
+ struct TSimpleCheck {
+ inline bool Check(THttpHeader*) {
+ return false;
+ }
+ void CheckDocPart(void* data, size_t size, THttpHeader*) {
+ if (!!NeedDataCallback) {
+ CheckData += TString(static_cast<const char*>(data), size);
+ if (!NeedDataCallback(CheckData)) {
+ BodyMax = 0;
+ }
+ }
+ }
+ void CheckEndDoc(THttpHeader*) {
+ }
+ size_t GetMaxHeaderSize() {
+ return HeaderMax;
+ }
+ size_t GetMaxBodySize(THttpHeader*) {
+ return BodyMax;
+ }
+ void SetMaxHeaderSize(size_t headerMax) {
+ HeaderMax = headerMax;
+ }
+ void SetMaxBodySize(size_t bodyMax) {
+ BodyMax = bodyMax;
+ }
+ void SetNeedDataCallback(const TNeedDataCallback& callback) {
+ NeedDataCallback = callback;
+ }
+
+ private:
+ size_t HeaderMax;
+ size_t BodyMax;
+ TNeedDataCallback NeedDataCallback;
+ TString CheckData;
+ };
+
+ using TSimpleHttpAgent = THttpsAgent<TCoSocketHandler, TCoIpResolver,
+ TSslSocketBase::TFakeLogger, TNoTimer,
+ NHttpFetcher::TSslSocketHandler>;
+ using TSimpleFetcherFetcher = THttpFetcher<TFakeAlloc<>, TSimpleCheck, TStringSaver, TSimpleHttpAgent>;
+
+ //! Private method of fetcher library. Don't use it in your code.
+ TResultRef FetchSingleImpl(TRequestRef request, TSocketPool* pool = nullptr);
+}
diff --git a/library/cpp/http/client/fetch/parse.cpp b/library/cpp/http/client/fetch/parse.cpp
new file mode 100644
index 0000000000..62d610102b
--- /dev/null
+++ b/library/cpp/http/client/fetch/parse.cpp
@@ -0,0 +1,160 @@
+#include "codes.h"
+#include "parse.h"
+
+#include <library/cpp/charset/codepage.h>
+#include <library/cpp/http/io/stream.h>
+#include <library/cpp/mime/types/mime.h>
+#include <library/cpp/uri/uri.h>
+
+#include <library/cpp/string_utils/url/url.h>
+#include <util/string/vector.h>
+
+namespace NHttpFetcher {
+ namespace {
+ static TString MimeTypeFromUrl(const NUri::TUri& httpUrl) {
+ TStringBuf path = httpUrl.GetField(NUri::TField::FieldPath);
+ size_t pos = path.find_last_of('.');
+ if (pos == TStringBuf::npos) {
+ return "";
+ }
+ // TODO (stanly) replace TString with TStringBuf
+ TString ext = TString(path.substr(pos + 1));
+ TString mime = mimetypeByExt(path.data());
+ if (mime) {
+ return mime;
+ }
+
+ if (ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "gif") {
+ return "image/" + ext;
+ } else if (ext == "m4v" || ext == "mp4" || ext == "flv" || ext == "mpeg") {
+ return "video/" + ext;
+ } else if (ext == "mp3" || ext == "wav" || ext == "ogg") {
+ return "audio/" + ext;
+ } else if (ext == "zip" || ext == "doc" || ext == "docx" || ext == "xls" || ext == "xlsx" || ext == "pdf" || ext == "ppt") {
+ return "application/" + ext;
+ } else if (ext == "rar" || ext == "7z") {
+ return "application/x-" + ext + "-compressed";
+ } else if (ext == "exe") {
+ return "application/octet-stream";
+ }
+
+ return "";
+ }
+
+ static TString MimeTypeFromUrl(const TString& url) {
+ static const ui64 flags = NUri::TFeature::FeaturesRobot | NUri::TFeature::FeatureToLower;
+
+ NUri::TUri httpUrl;
+ if (httpUrl.Parse(url, flags) != NUri::TUri::ParsedOK) {
+ return "";
+ }
+
+ return MimeTypeFromUrl(httpUrl);
+ }
+
+ // Extracts encoding & content-type from headers
+ static void ProcessHeaders(TResult& result) {
+ for (THttpHeaders::TConstIterator it = result.Headers.Begin(); it != result.Headers.End(); it++) {
+ TString name = it->Name();
+ name.to_lower();
+ if (name == "content-type") {
+ TString value = it->Value();
+ value.to_lower();
+ size_t delimPos = value.find(';');
+ if (delimPos == TString::npos) {
+ delimPos = value.size();
+ }
+ result.MimeType = value.substr(0, delimPos);
+ size_t charsetPos = value.find("charset=");
+ if (charsetPos == TString::npos) {
+ continue;
+ }
+ delimPos = value.find(';', charsetPos + 1);
+ TString charsetStr = value.substr(charsetPos + 8,
+ delimPos == TString::npos ? delimPos : delimPos - charsetPos - 8);
+ ECharset charset = CharsetByName(charsetStr.data());
+ if (charset != CODES_UNSUPPORTED && charset != CODES_UNKNOWN) {
+ result.Encoding = charset;
+ }
+ }
+ }
+
+ if (result.MimeType.empty() || result.MimeType == "application/octet-stream") {
+ const TString& detectedMimeType = MimeTypeFromUrl(result.ResolvedUrl);
+ if (detectedMimeType) {
+ result.MimeType = detectedMimeType;
+ }
+ }
+ }
+
+ }
+
+ void ParseHttpResponse(TResult& result, IInputStream& is, THttpURL::EKind kind,
+ TStringBuf host, ui16 port) {
+ THttpInput httpIn(&is);
+ TString firstLine = httpIn.FirstLine();
+ TVector<TString> params = SplitString(firstLine, " ");
+ try {
+ if (params.size() < 2) {
+ ythrow yexception() << "failed to parse first line";
+ }
+ result.HttpVersion = params[0];
+ result.Code = FromString(params[1]);
+ } catch (const std::exception&) {
+ result.Code = WRONG_HTTP_HEADER_CODE;
+ }
+ for (auto it = httpIn.Headers().Begin(); it < httpIn.Headers().End(); ++it) {
+ const THttpInputHeader& header = *it;
+ TString name = header.Name();
+ name.to_lower();
+ if (name == "location" && IsRedirectCode(result.Code)) {
+ // TODO (stanly) use correct routine to parse location
+ result.Location = header.Value();
+ result.ResolvedUrl = header.Value();
+ if (result.ResolvedUrl.StartsWith('/')) {
+ const bool defaultPort =
+ (kind == THttpURL::SchemeHTTP && port == 80) ||
+ (kind == THttpURL::SchemeHTTPS && port == 443);
+
+ result.ResolvedUrl = TString(NUri::SchemeKindToString(kind)) + "://" + host +
+ (defaultPort ? "" : ":" + ToString(port)) +
+ result.ResolvedUrl;
+ }
+ }
+ }
+ try {
+ result.Headers = httpIn.Headers();
+ result.Data = httpIn.ReadAll();
+ ProcessHeaders(result);
+ // TODO (stanly) try to detect mime-type by content
+ } catch (const yexception& /* exception */) {
+ result.Code = WRONG_HTTP_RESPONSE;
+ }
+ }
+
+ void ParseHttpResponse(TResult& result, IInputStream& stream, const TString& url) {
+ THttpURL::EKind kind;
+ TString host;
+ ui16 port;
+ ParseUrl(url, kind, host, port);
+ ParseHttpResponse(result, stream, kind, host, port);
+ }
+
+ void ParseUrl(const TStringBuf url, THttpURL::EKind& kind, TString& host, ui16& port) {
+ using namespace NUri;
+
+ static const int URI_PARSE_FLAGS =
+ TFeature::FeatureSchemeKnown | TFeature::FeatureConvertHostIDN | TFeature::FeatureEncodeExtendedDelim | TFeature::FeatureEncodePercent;
+
+ TUri uri;
+ // Cut out url's path to speedup processing.
+ if (uri.Parse(GetSchemeHostAndPort(url, false, false), URI_PARSE_FLAGS) != TUri::ParsedOK) {
+ ythrow yexception() << "can't parse url: " << url;
+ }
+
+ kind = uri.GetScheme();
+ host = uri.GetField(TField::FieldHost);
+ port = uri.GetPort();
+ }
+
+}
diff --git a/library/cpp/http/client/fetch/parse.h b/library/cpp/http/client/fetch/parse.h
new file mode 100644
index 0000000000..dacfa9bf84
--- /dev/null
+++ b/library/cpp/http/client/fetch/parse.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "fetch_result.h"
+#include <library/cpp/uri/http_url.h>
+
+namespace NHttpFetcher {
+ void ParseUrl(const TStringBuf url, THttpURL::EKind& kind, TString& host, ui16& port);
+
+ void ParseHttpResponse(TResult& result, IInputStream& stream, THttpURL::EKind kind,
+ TStringBuf host, ui16 port);
+
+ void ParseHttpResponse(TResult& result, IInputStream& stream, const TString& url);
+
+}
diff --git a/library/cpp/http/client/fetch/pool.cpp b/library/cpp/http/client/fetch/pool.cpp
new file mode 100644
index 0000000000..f0a142eced
--- /dev/null
+++ b/library/cpp/http/client/fetch/pool.cpp
@@ -0,0 +1,57 @@
+#include "pool.h"
+
+namespace NHttpFetcher {
+ void TSocketPool::Clear() {
+ TSocketMap sockets;
+
+ {
+ auto g(Guard(Lock_));
+ Sockets_.swap(sockets);
+ }
+ }
+
+ void TSocketPool::Drain(const TDuration timeout) {
+ const TInstant now = TInstant::Now();
+ TVector<THolder<TSocketHandle>> sockets;
+
+ {
+ auto g(Guard(Lock_));
+ for (auto si = Sockets_.begin(); si != Sockets_.end();) {
+ if (si->second.Touched + timeout < now) {
+ sockets.push_back(std::move(si->second.Socket));
+ Sockets_.erase(si++);
+ } else {
+ ++si;
+ }
+ }
+ }
+ }
+
+ THolder<TSocketPool::TSocketHandle> TSocketPool::GetSocket(const TString& host, const TIpPort port) {
+ THolder<TSocketPool::TSocketHandle> socket;
+
+ {
+ auto g(Guard(Lock_));
+ auto si = Sockets_.find(std::make_pair(host, port));
+ if (si != Sockets_.end()) {
+ socket = std::move(si->second.Socket);
+ Sockets_.erase(si);
+ }
+ }
+
+ return socket;
+ }
+
+ void TSocketPool::ReturnSocket(const TString& host, const TIpPort port, THolder<TSocketHandle> socket) {
+ TConnection conn;
+
+ conn.Socket = std::move(socket);
+ conn.Touched = TInstant::Now();
+
+ {
+ auto g(Guard(Lock_));
+ Sockets_.emplace(std::make_pair(host, port), std::move(conn));
+ }
+ }
+
+}
diff --git a/library/cpp/http/client/fetch/pool.h b/library/cpp/http/client/fetch/pool.h
new file mode 100644
index 0000000000..73c3eda0c6
--- /dev/null
+++ b/library/cpp/http/client/fetch/pool.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "cosocket.h"
+
+#include <library/cpp/http/client/ssl/sslsock.h>
+
+#include <util/generic/hash_multi_map.h>
+#include <util/generic/ptr.h>
+#include <util/system/mutex.h>
+
+namespace NHttpFetcher {
+ class TSocketPool {
+ public:
+ using TSocketHandle = TSslSocketHandler<TCoSocketHandler, TSslSocketBase::TFakeLogger>;
+
+ public:
+ /// Closes all sockets.
+ void Clear();
+
+ /// Closes all sockets that have been opened too long.
+ void Drain(const TDuration timeout);
+
+ /// Returns socket for the given endpoint if available.
+ THolder<TSocketHandle> GetSocket(const TString& host, const TIpPort port);
+
+ /// Puts socket to the pool.
+ void ReturnSocket(const TString& host, const TIpPort port, THolder<TSocketHandle> socket);
+
+ private:
+ struct TConnection {
+ THolder<TSocketHandle> Socket;
+ TInstant Touched;
+ };
+
+ using TSocketMap = THashMultiMap<std::pair<TString, TIpPort>, TConnection>;
+
+ TMutex Lock_;
+ TSocketMap Sockets_;
+ };
+
+}