diff options
author | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/http/client/fetch | |
parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
download | ydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz |
validate canons without yatest_common
Diffstat (limited to 'library/cpp/http/client/fetch')
-rw-r--r-- | library/cpp/http/client/fetch/coctx.cpp | 1 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/coctx.h | 50 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/codes.h | 36 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/cosocket.h | 97 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/fetch_request.cpp | 114 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/fetch_request.h | 65 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/fetch_result.cpp | 32 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/fetch_result.h | 40 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/fetch_single.cpp | 205 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/fetch_single.h | 88 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/parse.cpp | 160 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/parse.h | 14 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/pool.cpp | 57 | ||||
-rw-r--r-- | library/cpp/http/client/fetch/pool.h | 41 |
14 files changed, 1000 insertions, 0 deletions
diff --git a/library/cpp/http/client/fetch/coctx.cpp b/library/cpp/http/client/fetch/coctx.cpp new file mode 100644 index 0000000000..dfbb6943a3 --- /dev/null +++ b/library/cpp/http/client/fetch/coctx.cpp @@ -0,0 +1 @@ +#include "coctx.h" diff --git a/library/cpp/http/client/fetch/coctx.h b/library/cpp/http/client/fetch/coctx.h new file mode 100644 index 0000000000..bd1b61cb59 --- /dev/null +++ b/library/cpp/http/client/fetch/coctx.h @@ -0,0 +1,50 @@ +#pragma once + +#include <library/cpp/coroutine/engine/impl.h> + +#include <util/thread/singleton.h> + +namespace NAsyncDns { + class TContResolver; + class TContDnsCache; +} + +namespace NHttpFetcher { + struct TCoCtx { + TContExecutor* Executor; + NAsyncDns::TContResolver* Resolver; + NAsyncDns::TContDnsCache* DnsCache; + + TCoCtx(TContExecutor* executor, NAsyncDns::TContResolver* resolver, NAsyncDns::TContDnsCache* dnsCache = nullptr) + : Executor(executor) + , Resolver(resolver) + , DnsCache(dnsCache) + { + } + + TCont* Cont() { + return Executor->Running(); + } + }; + + inline TCoCtx*& CoCtx() { + return *FastTlsSingletonWithPriority<TCoCtx*, 0>(); + } + + class TCoCtxSetter { + public: + TCoCtxSetter(TContExecutor* executor, NAsyncDns::TContResolver* resolver, NAsyncDns::TContDnsCache* dnsCache = nullptr) + : Instance(executor, resolver, dnsCache) + { + Y_VERIFY(!CoCtx(), "coCtx already exists"); + CoCtx() = &Instance; + } + + ~TCoCtxSetter() { + CoCtx() = nullptr; + } + + private: + TCoCtx Instance; + }; +} diff --git a/library/cpp/http/client/fetch/codes.h b/library/cpp/http/client/fetch/codes.h new file mode 100644 index 0000000000..25d09c88f8 --- /dev/null +++ b/library/cpp/http/client/fetch/codes.h @@ -0,0 +1,36 @@ +#pragma once + +namespace NHttpFetcher { + const int FETCH_SUCCESS_CODE = 200; + const int SERVICE_UNAVAILABLE = 503; + const int ZORA_TIMEOUT_CODE = 5000; + const int URL_FILTER_CODE = 6000; + const int WRONG_HTTP_HEADER_CODE = 6001; + const int FETCH_LARGE_FILE = 6002; + const int FETCH_CANNOT_PARSE = 6003; + const int HOSTS_QUEUE_TIMEOUT = 6004; + const int WRONG_HTTP_RESPONSE = 6005; + const int UNKNOWN_ERROR = 6006; + const int FETCHER_QUEUE_TIMEOUT = 6007; + const int FETCH_IGNORE = 6008; + const int FETCH_CANCELLED = 6009; + + inline bool IsRedirectCode(int code) { + return 301 == code || 302 == code || 303 == code || + 305 == code || 307 == code || 308 == code; + } + + inline bool IsSuccessCode(int code) { + return code >= 200 && code < 300; + } + + inline bool NoRefetch(int code) { + return code == 415 || // Unsupported media type + code == 601 || // Large file + (code >= 400 && code < 500) || + code == 1003 || // disallowed by robots.txt + code == 1006 || // not found by dns server + code == 6008; // once ignored, always ignored + } + +} diff --git a/library/cpp/http/client/fetch/cosocket.h b/library/cpp/http/client/fetch/cosocket.h new file mode 100644 index 0000000000..8230d36bbe --- /dev/null +++ b/library/cpp/http/client/fetch/cosocket.h @@ -0,0 +1,97 @@ +#pragma once + +#include "coctx.h" + +#include <library/cpp/coroutine/engine/network.h> +#include <library/cpp/http/fetch_gpl/sockhandler.h> + +#include <util/system/error.h> + +namespace NHttpFetcher { + class TCoSocketHandler { + public: + TCoSocketHandler() = default; + + ~TCoSocketHandler() { + Disconnect(); + } + + int Good() const { + return (Fd != INVALID_SOCKET); + } + + int Connect(const TAddrList& addrs, TDuration timeout) { + TCont* cont = CoCtx()->Cont(); + Timeout = timeout; + for (const auto& item : addrs) { + try { + const sockaddr* sa = item->Addr(); + TSocketHolder s(NCoro::Socket(sa->sa_family, SOCK_STREAM, 0)); + if (s.Closed()) { + continue; + } + int err = NCoro::ConnectT(cont, s, sa, item->Len(), Timeout); + if (err) { + s.Close(); + errno = err; + continue; + } + SetZeroLinger(s); + SetKeepAlive(s, true); + Fd.Swap(s); + return 0; + } catch (const TSystemError&) { + } + } + return errno ? errno : EBADF; + } + + void Disconnect() { + if (Fd.Closed()) + return; + try { + ShutDown(Fd, SHUT_RDWR); + } catch (const TSystemError&) { + } + Fd.Close(); + } + + void shutdown() { + try { + ShutDown(Fd, SHUT_WR); + } catch (TSystemError&) { + } + } + + ssize_t send(const void* message, size_t messlen) { + TCont* cont = CoCtx()->Cont(); + TContIOStatus status = NCoro::WriteT(cont, Fd, message, messlen, Timeout); + errno = status.Status(); + return status.Status() ? -1 : (ssize_t)status.Processed(); + } + + bool peek() { + TCont* cont = CoCtx()->Cont(); + if ((errno = NCoro::PollT(cont, Fd, CONT_POLL_READ, Timeout))) + return false; + char buf[1]; +#ifdef _win32_ + return (1 == ::recv(Fd, buf, 1, MSG_PEEK)); +#else + return (1 == ::recv(Fd, buf, 1, MSG_PEEK | MSG_DONTWAIT)); +#endif + } + + ssize_t read(void* message, size_t messlen) { + TCont* cont = CoCtx()->Cont(); + TContIOStatus status = NCoro::ReadT(cont, Fd, message, messlen, Timeout); + errno = status.Status(); + return status.Status() ? -1 : (ssize_t)status.Processed(); + } + + protected: + TSocketHolder Fd; + TDuration Timeout; + static THolder<TIpAddress> AddrToBind; + }; +} diff --git a/library/cpp/http/client/fetch/fetch_request.cpp b/library/cpp/http/client/fetch/fetch_request.cpp new file mode 100644 index 0000000000..2f8453fc45 --- /dev/null +++ b/library/cpp/http/client/fetch/fetch_request.cpp @@ -0,0 +1,114 @@ +#include "fetch_request.h" + +#include <library/cpp/deprecated/atomic/atomic.h> + +// TRequest +namespace NHttpFetcher { + const TString DEFAULT_ACCEPT_ENCODING = "gzip, deflate"; + const size_t DEFAULT_MAX_HEADER_SIZE = 100 << 10; + const size_t DEFAULT_MAX_BODY_SIZE = 1 << 29; + + TRequest::TRequest(const TString& url, TCallBack onFetch) + : Url(url) + , Deadline(TInstant::Now() + DEFAULT_REQUEST_TIMEOUT) + , Freshness(DEFAULT_REQUEST_FRESHNESS) + , Priority(40) + , IgnoreRobotsTxt(false) + , LangRegion(ELR_RU) + , OnFetch(onFetch) + , AcceptEncoding(DEFAULT_ACCEPT_ENCODING) + , OnlyHeaders(false) + , MaxHeaderSize(DEFAULT_MAX_HEADER_SIZE) + , MaxBodySize(DEFAULT_MAX_BODY_SIZE) + { + GenerateSequence(); + } + + TRequest::TRequest(const TString& url, bool ignoreRobotsTxt, TDuration timeout, TDuration freshness, TCallBack onFetch) + : Url(url) + , Deadline(Now() + timeout) + , Freshness(freshness) + , Priority(40) + , IgnoreRobotsTxt(ignoreRobotsTxt) + , LangRegion(ELR_RU) + , OnFetch(onFetch) + , AcceptEncoding(DEFAULT_ACCEPT_ENCODING) + , OnlyHeaders(false) + , MaxHeaderSize(DEFAULT_MAX_HEADER_SIZE) + , MaxBodySize(DEFAULT_MAX_BODY_SIZE) + { + GenerateSequence(); + } + + TRequest::TRequest(const TString& url, TDuration timeout, TDuration freshness, bool ignoreRobots, + size_t priority, const TMaybe<TString>& login, const TMaybe<TString>& password, + ELangRegion langRegion, TCallBack onFetch) + : Url(url) + , Deadline(Now() + timeout) + , Freshness(freshness) + , Priority(priority) + , Login(login) + , Password(password) + , IgnoreRobotsTxt(ignoreRobots) + , LangRegion(langRegion) + , OnFetch(onFetch) + , AcceptEncoding(DEFAULT_ACCEPT_ENCODING) + , OnlyHeaders(false) + , MaxHeaderSize(DEFAULT_MAX_HEADER_SIZE) + , MaxBodySize(DEFAULT_MAX_BODY_SIZE) + { + GenerateSequence(); + } + + void TRequest::GenerateSequence() { + static TAtomic nextSeq = 0; + Sequence = AtomicIncrement(nextSeq); + } + + TRequestRef TRequest::Clone() { + THolder<TRequest> request = THolder<TRequest>(new TRequest(*this)); + request->GenerateSequence(); + return request.Release(); + } + + void TRequest::Dump(IOutputStream& out) { + out << "url: " << Url << "\n"; + out << "timeout: " << (Deadline - Now()).MilliSeconds() << " ms\n"; + out << "freshness: " << Freshness.Seconds() << "\n"; + out << "priority: " << Priority << "\n"; + if (!!Login) { + out << "login: " << *Login << "\n"; + } + if (!!Password) { + out << "password: " << *Password << "\n"; + } + if (!!OAuthToken) { + out << "oauth token: " << *OAuthToken << "\n"; + } + if (IgnoreRobotsTxt) { + out << "ignore robots: " << IgnoreRobotsTxt << "\n"; + } + out << "lang reg: " << LangRegion2Str(LangRegion) << "\n"; + if (!!CustomHost) { + out << "custom host: " << *CustomHost << "\n"; + } + if (!!UserAgent) { + out << "user agent: " << *UserAgent << "\n"; + } + if (!!AcceptEncoding) { + out << "accept enc: " << *AcceptEncoding << "\n"; + } + if (OnlyHeaders) { + out << "only headers: " << OnlyHeaders << "\n"; + } + out << "max header sz: " << MaxHeaderSize << "\n"; + out << "max body sz: " << MaxBodySize << "\n"; + if (!!PostData) { + out << "post data: " << *PostData << "\n"; + } + if (!!ContentType) { + out << "content type: " << *ContentType << "\n"; + } + } + +} diff --git a/library/cpp/http/client/fetch/fetch_request.h b/library/cpp/http/client/fetch/fetch_request.h new file mode 100644 index 0000000000..169c2940d7 --- /dev/null +++ b/library/cpp/http/client/fetch/fetch_request.h @@ -0,0 +1,65 @@ +#pragma once + +#include "fetch_result.h" + +#include <kernel/langregion/langregion.h> + +#include <util/datetime/base.h> +#include <util/generic/ptr.h> + +namespace NHttpFetcher { + const TDuration DEFAULT_REQUEST_TIMEOUT = TDuration::Minutes(1); + const TDuration DEFAULT_REQUEST_FRESHNESS = TDuration::Seconds(10000); + + class TRequest; + using TRequestRef = TIntrusivePtr<TRequest>; + + class TRequest: public TAtomicRefCount<TRequest> { + private: + TRequest(const TRequest&) = default; + TRequest& operator=(const TRequest&) = default; + void GenerateSequence(); + + public: + TRequest(const TString& url = "", TCallBack onFetch = TCallBack()); + TRequest(const TString& url, bool ignoreRobotsTxt, TDuration timeout, + TDuration freshness, TCallBack onFetch = TCallBack()); + TRequest(const TString& url, TDuration timeout, TDuration freshness, bool ignoreRobots, + size_t priority, const TMaybe<TString>& login = TMaybe<TString>(), + const TMaybe<TString>& password = TMaybe<TString>(), + ELangRegion langRegion = ELR_RU, TCallBack onFetch = TCallBack()); + void Dump(IOutputStream& out); + TRequestRef Clone(); + + public: + TString Url; + TMaybe<TString> UnixSocketPath; + + TInstant Deadline; // [default = 1 min] + TDuration RdWrTimeout; + TMaybe<TDuration> ConnectTimeout; + TDuration Freshness; // [default = 1000 sec] + size_t Priority; // lower is more important; range [0, 100], default 40 + + TMaybe<TString> Login; + TMaybe<TString> Password; + TMaybe<TString> OAuthToken; + bool IgnoreRobotsTxt; // [default = false] + ELangRegion LangRegion; // [default = ELR_RU] + TCallBack OnFetch; // for async requests + ui64 Sequence; // unique id + TMaybe<TString> CustomHost; // Use custom host for "Host" header + TMaybe<TString> UserAgent; // custom user agen, [default = YandexNews] + TMaybe<TString> AcceptEncoding; // custom accept encoding, [default = "gzip, deflate"] + bool OnlyHeaders; // [default = false], if true - no content will be fetched (HEAD request) + size_t MaxHeaderSize; // returns 1002 error if exceeded + size_t MaxBodySize; // returns 1002 error if exceeded + TNeedDataCallback NeedDataCallback; // set this callback if you need to check data while fetching + // true - coninue fetching, false - stop + TMaybe<TString> Method; // for http exotics like "PUT ", "PATCH ", "DELETE ". if doesn't exist, GET or POST will br used + TMaybe<TString> PostData; // if exists - send post request + TMaybe<TString> ContentType; // custom content-type for post requests + // [default = "application/x-www-form-urlencoded"] + TVector<TString> ExtraHeaders; // needed for some servers (to auth, for ex.); don't forget to add "\r\n"! + }; +} diff --git a/library/cpp/http/client/fetch/fetch_result.cpp b/library/cpp/http/client/fetch/fetch_result.cpp new file mode 100644 index 0000000000..0ba1b1e6be --- /dev/null +++ b/library/cpp/http/client/fetch/fetch_result.cpp @@ -0,0 +1,32 @@ +#include "codes.h" +#include "fetch_result.h" + +#include <library/cpp/charset/recyr.hh> + +namespace NHttpFetcher { + TResult::TResult(const TString& url, int code) + : RequestUrl(url) + , ResolvedUrl(url) + , Code(code) + , ConnectionReused(false) + { + } + + TString TResult::DecodeData(bool* decoded) const { + if (!!Encoding && *Encoding != CODES_UTF8) { + if (decoded) { + *decoded = true; + } + return Recode(*Encoding, CODES_UTF8, Data); + } + if (decoded) { + *decoded = false; + } + return Data; + } + + bool TResult::Success() const { + return Code == FETCH_SUCCESS_CODE; + } + +} diff --git a/library/cpp/http/client/fetch/fetch_result.h b/library/cpp/http/client/fetch/fetch_result.h new file mode 100644 index 0000000000..24fe49e1f6 --- /dev/null +++ b/library/cpp/http/client/fetch/fetch_result.h @@ -0,0 +1,40 @@ +#pragma once + +#include <library/cpp/charset/doccodes.h> +#include <library/cpp/http/io/headers.h> +#include <library/cpp/langs/langs.h> + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> + +#include <functional> + +namespace NHttpFetcher { + // Result + using TResultRef = TIntrusivePtr<struct TResult>; + struct TResult: public TAtomicRefCount<TResult> { + TResult(const TString& url, int code = 0); + TString DecodeData(bool* decoded = nullptr) const; + bool Success() const; + + public: + TString RequestUrl; + TString ResolvedUrl; + TString Location; + int Code; + bool ConnectionReused; + TString StatusStr; + TString MimeType; + THttpHeaders Headers; + TString Data; + TMaybe<ECharset> Encoding; + TMaybe<ELanguage> Language; + TVector<TResultRef> Redirects; + TString HttpVersion; + }; + + using TCallBack = std::function<void(TResultRef)>; + using TNeedDataCallback = std::function<bool(const TString&)>; +} diff --git a/library/cpp/http/client/fetch/fetch_single.cpp b/library/cpp/http/client/fetch/fetch_single.cpp new file mode 100644 index 0000000000..27d0888b80 --- /dev/null +++ b/library/cpp/http/client/fetch/fetch_single.cpp @@ -0,0 +1,205 @@ +#include "codes.h" +#include "fetch_single.h" +#include "parse.h" + +#include <library/cpp/string_utils/base64/base64.h> + +#include <util/string/ascii.h> +#include <util/string/cast.h> +#include <library/cpp/string_utils/url/url.h> + +namespace NHttpFetcher { + static sockaddr_in6 IPv6Loopback() { + sockaddr_in6 sock = {}; + sock.sin6_family = AF_INET6; + sock.sin6_addr = IN6ADDR_LOOPBACK_INIT; + return sock; + } + + class THeaders { + public: + inline void Build(const TRequestRef& request) { + if (!!request->Password || !!request->Login) { + TString pass; + TString login; + TString raw; + TString encoded; + + if (!!request->Password) { + pass = *request->Password; + } + if (!!request->Login) { + login = *request->Login; + } + raw = TString::Join(login, ":", pass); + Base64Encode(raw, encoded); + BasicAuth_ = TString::Join("Authorization: Basic ", encoded, "\r\n"); + Headers_.push_back(BasicAuth_.c_str()); + } + + if (request->ExtraHeaders) { + Headers_.reserve(Headers_.size() + request->ExtraHeaders.size()); + for (const TString& header : request->ExtraHeaders) { + if (AsciiHasSuffixIgnoreCase(header, "\r\n")) { + Headers_.push_back(header.c_str()); + } else { + Fixed_.push_back(header + "\r\n"); + Headers_.push_back(Fixed_.back().c_str()); + } + } + } + + if (!!request->OAuthToken) { + OAuth_ = TString::Join("Authorization: OAuth ", *request->OAuthToken, "\r\n"); + Headers_.push_back(OAuth_.c_str()); + } + + if (!!request->AcceptEncoding) { + AcceptEncoding_ = TString::Join("Accept-Encoding: ", *request->AcceptEncoding, "\r\n"); + Headers_.push_back(AcceptEncoding_.c_str()); + } + + ContentType_ = "application/x-www-form-urlencoded"; + if (!!request->PostData) { + if (!!request->ContentType) { + ContentType_ = *request->ContentType; + } + ContentLength_ = TString::Join("Content-Length: ", ToString(request->PostData->size()), "\r\n"); + ContentType_ = TString::Join("Content-Type: ", ContentType_, "\r\n"); + Headers_.push_back(ContentLength_.c_str()); + Headers_.push_back(ContentType_.c_str()); + } + + Headers_.push_back((const char*)nullptr); + } + + inline const char* const* Data() { + return Headers_.data(); + } + + private: + TVector<const char*> Headers_; + TVector<TString> Fixed_; + TString BasicAuth_; + TString OAuth_; + TString AcceptEncoding_; + TString ContentLength_; + TString ContentType_; + }; + + TResultRef FetchSingleImpl(TRequestRef request, TSocketPool* pool) { + Y_ASSERT(!!request->Url && "no url passed in fetch request"); + TResultRef result(new TResult(request->Url)); + try { + TSimpleFetcherFetcher fetcher; + THttpHeader header; + + THttpURL::EKind kind = THttpURL::SchemeHTTP; + ui16 port = 80; + TString host; + ParseUrl(request->Url, kind, host, port); + + TString path = ToString(GetPathAndQuery(request->Url)); + + bool defaultPort = (kind == THttpURL::SchemeHTTP && port == 80) || + (kind == THttpURL::SchemeHTTPS && port == 443); + + if (request->UnixSocketPath && !request->UnixSocketPath->empty()) { + TAddrList addrs; + addrs.emplace_back(new NAddr::TUnixSocketAddr{*request->UnixSocketPath}); + fetcher.SetHost(host.data(), port, addrs, kind); + } else if (host == "127.0.0.1") { + // todo: correctly handle /etc/hosts records && ip-addresses + + // bypass normal DNS resolving for localhost + TAddrList addrs({new NAddr::TIPv4Addr(TIpAddress(0x0100007F, port))}); + fetcher.SetHost(host.data(), port, addrs, kind); + } else if (host == "localhost") { + sockaddr_in6 ipv6Addr = IPv6Loopback(); + ipv6Addr.sin6_port = HostToInet(port); + + TAddrList addrs({new NAddr::TIPv6Addr(ipv6Addr), new NAddr::TIPv4Addr(TIpAddress(0x0100007F, port))}); + fetcher.SetHost(host.data(), port, addrs, kind); + } else { + Y_ASSERT(!!host && "no host detected in url passed"); + fetcher.SetHost(host.data(), port, kind); + } + header.Init(); + + THeaders headers; + headers.Build(request); + + TString hostHeader = (!!request->CustomHost ? *request->CustomHost : host) + + (defaultPort ? "" : ":" + ToString(port)); + fetcher.SetHostHeader(hostHeader.data()); + + if (!!request->UserAgent) { + fetcher.SetIdentification((*request->UserAgent).data(), nullptr); + } else { + fetcher.SetIdentification("Mozilla/5.0 (compatible; YandexNews/3.0; +http://yandex.com/bots)", nullptr); + } + + if (!!request->Method) { + fetcher.SetMethod(request->Method->data(), request->Method->size()); + } + + if (!!request->PostData) { + fetcher.SetPostData(request->PostData->data(), request->PostData->size()); + } + + fetcher.SetMaxBodySize(request->MaxBodySize); + fetcher.SetMaxHeaderSize(request->MaxHeaderSize); + + if (request->ConnectTimeout) { + fetcher.SetConnectTimeout(*request->ConnectTimeout); + } + + { + const TDuration rest = request->Deadline - Now(); + fetcher.SetTimeout(request->RdWrTimeout != TDuration::Zero() + ? ::Min(request->RdWrTimeout, rest) + : rest); + } + fetcher.SetNeedDataCallback(request->NeedDataCallback); + bool persistent = !request->OnlyHeaders; + void* handle = nullptr; + + if (pool) { + while (auto socket = pool->GetSocket(host, port)) { + if (socket->Good()) { + handle = socket.Get(); + + fetcher.SetSocket(socket.Release()); + fetcher.SetPersistent(true); + break; + } + } + } + + int fetchResult = fetcher.Fetch(&header, path.data(), headers.Data(), persistent, request->OnlyHeaders); + + if (!fetcher.Data.empty()) { + TStringInput httpIn(fetcher.Data.Str()); + ParseHttpResponse(*result, httpIn, kind, host, port); + } + + if (fetchResult < 0 || header.error != 0) { + result->Code = header.error; + } + + if (pool && persistent && !header.error && !header.connection_closed) { + THolder<TSocketPool::TSocketHandle> socket(fetcher.PickOutSocket()); + + if (!!socket && socket->Good()) { + if (handle == socket.Get()) { + result->ConnectionReused = true; + } + pool->ReturnSocket(host, port, std::move(socket)); + } + } + } catch (...) { + result->Code = UNKNOWN_ERROR; + } + return result; + } +} diff --git a/library/cpp/http/client/fetch/fetch_single.h b/library/cpp/http/client/fetch/fetch_single.h new file mode 100644 index 0000000000..890c925cc9 --- /dev/null +++ b/library/cpp/http/client/fetch/fetch_single.h @@ -0,0 +1,88 @@ +#pragma once + +#include "cosocket.h" +#include "fetch_request.h" +#include "fetch_result.h" +#include "pool.h" + +#include <library/cpp/coroutine/dns/helpers.h> +#include <library/cpp/http/client/ssl/sslsock.h> +#include <library/cpp/http/fetch_gpl/httpagent.h> +#include <library/cpp/http/fetch/httpfetcher.h> +#include <library/cpp/http/fetch/httpheader.h> + +#include <util/generic/algorithm.h> + +namespace NHttpFetcher { + class TCoIpResolver { + public: + TAddrList Resolve(const char* host, TIpPort port) const { + NAsyncDns::TAddrs addrs; + try { + NAsyncDns::ResolveAddr(*CoCtx()->Resolver, host, port, addrs, CoCtx()->DnsCache); + } catch (...) { + return TAddrList(); + } + + // prefer IPv6 + SortBy(addrs.begin(), addrs.end(), [](const auto& addr) { + return addr->Addr()->sa_family == AF_INET6 ? 0 : 1; + }); + + return TAddrList(addrs.begin(), addrs.end()); + } + }; + + struct TStringSaver { + int Write(const void* buf, size_t len) { + Data.Write(buf, len); + return 0; + } + TStringStream Data; + }; + + struct TSimpleCheck { + inline bool Check(THttpHeader*) { + return false; + } + void CheckDocPart(void* data, size_t size, THttpHeader*) { + if (!!NeedDataCallback) { + CheckData += TString(static_cast<const char*>(data), size); + if (!NeedDataCallback(CheckData)) { + BodyMax = 0; + } + } + } + void CheckEndDoc(THttpHeader*) { + } + size_t GetMaxHeaderSize() { + return HeaderMax; + } + size_t GetMaxBodySize(THttpHeader*) { + return BodyMax; + } + void SetMaxHeaderSize(size_t headerMax) { + HeaderMax = headerMax; + } + void SetMaxBodySize(size_t bodyMax) { + BodyMax = bodyMax; + } + void SetNeedDataCallback(const TNeedDataCallback& callback) { + NeedDataCallback = callback; + } + + private: + size_t HeaderMax; + size_t BodyMax; + TNeedDataCallback NeedDataCallback; + TString CheckData; + }; + + using TSimpleHttpAgent = THttpsAgent<TCoSocketHandler, TCoIpResolver, + TSslSocketBase::TFakeLogger, TNoTimer, + NHttpFetcher::TSslSocketHandler>; + using TSimpleFetcherFetcher = THttpFetcher<TFakeAlloc<>, TSimpleCheck, TStringSaver, TSimpleHttpAgent>; + + //! Private method of fetcher library. Don't use it in your code. + TResultRef FetchSingleImpl(TRequestRef request, TSocketPool* pool = nullptr); +} diff --git a/library/cpp/http/client/fetch/parse.cpp b/library/cpp/http/client/fetch/parse.cpp new file mode 100644 index 0000000000..62d610102b --- /dev/null +++ b/library/cpp/http/client/fetch/parse.cpp @@ -0,0 +1,160 @@ +#include "codes.h" +#include "parse.h" + +#include <library/cpp/charset/codepage.h> +#include <library/cpp/http/io/stream.h> +#include <library/cpp/mime/types/mime.h> +#include <library/cpp/uri/uri.h> + +#include <library/cpp/string_utils/url/url.h> +#include <util/string/vector.h> + +namespace NHttpFetcher { + namespace { + static TString MimeTypeFromUrl(const NUri::TUri& httpUrl) { + TStringBuf path = httpUrl.GetField(NUri::TField::FieldPath); + size_t pos = path.find_last_of('.'); + if (pos == TStringBuf::npos) { + return ""; + } + // TODO (stanly) replace TString with TStringBuf + TString ext = TString(path.substr(pos + 1)); + TString mime = mimetypeByExt(path.data()); + if (mime) { + return mime; + } + + if (ext == "jpg" || ext == "jpeg" || ext == "png" || ext == "gif") { + return "image/" + ext; + } else if (ext == "m4v" || ext == "mp4" || ext == "flv" || ext == "mpeg") { + return "video/" + ext; + } else if (ext == "mp3" || ext == "wav" || ext == "ogg") { + return "audio/" + ext; + } else if (ext == "zip" || ext == "doc" || ext == "docx" || ext == "xls" || ext == "xlsx" || ext == "pdf" || ext == "ppt") { + return "application/" + ext; + } else if (ext == "rar" || ext == "7z") { + return "application/x-" + ext + "-compressed"; + } else if (ext == "exe") { + return "application/octet-stream"; + } + + return ""; + } + + static TString MimeTypeFromUrl(const TString& url) { + static const ui64 flags = NUri::TFeature::FeaturesRobot | NUri::TFeature::FeatureToLower; + + NUri::TUri httpUrl; + if (httpUrl.Parse(url, flags) != NUri::TUri::ParsedOK) { + return ""; + } + + return MimeTypeFromUrl(httpUrl); + } + + // Extracts encoding & content-type from headers + static void ProcessHeaders(TResult& result) { + for (THttpHeaders::TConstIterator it = result.Headers.Begin(); it != result.Headers.End(); it++) { + TString name = it->Name(); + name.to_lower(); + if (name == "content-type") { + TString value = it->Value(); + value.to_lower(); + size_t delimPos = value.find(';'); + if (delimPos == TString::npos) { + delimPos = value.size(); + } + result.MimeType = value.substr(0, delimPos); + size_t charsetPos = value.find("charset="); + if (charsetPos == TString::npos) { + continue; + } + delimPos = value.find(';', charsetPos + 1); + TString charsetStr = value.substr(charsetPos + 8, + delimPos == TString::npos ? delimPos : delimPos - charsetPos - 8); + ECharset charset = CharsetByName(charsetStr.data()); + if (charset != CODES_UNSUPPORTED && charset != CODES_UNKNOWN) { + result.Encoding = charset; + } + } + } + + if (result.MimeType.empty() || result.MimeType == "application/octet-stream") { + const TString& detectedMimeType = MimeTypeFromUrl(result.ResolvedUrl); + if (detectedMimeType) { + result.MimeType = detectedMimeType; + } + } + } + + } + + void ParseHttpResponse(TResult& result, IInputStream& is, THttpURL::EKind kind, + TStringBuf host, ui16 port) { + THttpInput httpIn(&is); + TString firstLine = httpIn.FirstLine(); + TVector<TString> params = SplitString(firstLine, " "); + try { + if (params.size() < 2) { + ythrow yexception() << "failed to parse first line"; + } + result.HttpVersion = params[0]; + result.Code = FromString(params[1]); + } catch (const std::exception&) { + result.Code = WRONG_HTTP_HEADER_CODE; + } + for (auto it = httpIn.Headers().Begin(); it < httpIn.Headers().End(); ++it) { + const THttpInputHeader& header = *it; + TString name = header.Name(); + name.to_lower(); + if (name == "location" && IsRedirectCode(result.Code)) { + // TODO (stanly) use correct routine to parse location + result.Location = header.Value(); + result.ResolvedUrl = header.Value(); + if (result.ResolvedUrl.StartsWith('/')) { + const bool defaultPort = + (kind == THttpURL::SchemeHTTP && port == 80) || + (kind == THttpURL::SchemeHTTPS && port == 443); + + result.ResolvedUrl = TString(NUri::SchemeKindToString(kind)) + "://" + host + + (defaultPort ? "" : ":" + ToString(port)) + + result.ResolvedUrl; + } + } + } + try { + result.Headers = httpIn.Headers(); + result.Data = httpIn.ReadAll(); + ProcessHeaders(result); + // TODO (stanly) try to detect mime-type by content + } catch (const yexception& /* exception */) { + result.Code = WRONG_HTTP_RESPONSE; + } + } + + void ParseHttpResponse(TResult& result, IInputStream& stream, const TString& url) { + THttpURL::EKind kind; + TString host; + ui16 port; + ParseUrl(url, kind, host, port); + ParseHttpResponse(result, stream, kind, host, port); + } + + void ParseUrl(const TStringBuf url, THttpURL::EKind& kind, TString& host, ui16& port) { + using namespace NUri; + + static const int URI_PARSE_FLAGS = + TFeature::FeatureSchemeKnown | TFeature::FeatureConvertHostIDN | TFeature::FeatureEncodeExtendedDelim | TFeature::FeatureEncodePercent; + + TUri uri; + // Cut out url's path to speedup processing. + if (uri.Parse(GetSchemeHostAndPort(url, false, false), URI_PARSE_FLAGS) != TUri::ParsedOK) { + ythrow yexception() << "can't parse url: " << url; + } + + kind = uri.GetScheme(); + host = uri.GetField(TField::FieldHost); + port = uri.GetPort(); + } + +} diff --git a/library/cpp/http/client/fetch/parse.h b/library/cpp/http/client/fetch/parse.h new file mode 100644 index 0000000000..dacfa9bf84 --- /dev/null +++ b/library/cpp/http/client/fetch/parse.h @@ -0,0 +1,14 @@ +#pragma once + +#include "fetch_result.h" +#include <library/cpp/uri/http_url.h> + +namespace NHttpFetcher { + void ParseUrl(const TStringBuf url, THttpURL::EKind& kind, TString& host, ui16& port); + + void ParseHttpResponse(TResult& result, IInputStream& stream, THttpURL::EKind kind, + TStringBuf host, ui16 port); + + void ParseHttpResponse(TResult& result, IInputStream& stream, const TString& url); + +} diff --git a/library/cpp/http/client/fetch/pool.cpp b/library/cpp/http/client/fetch/pool.cpp new file mode 100644 index 0000000000..f0a142eced --- /dev/null +++ b/library/cpp/http/client/fetch/pool.cpp @@ -0,0 +1,57 @@ +#include "pool.h" + +namespace NHttpFetcher { + void TSocketPool::Clear() { + TSocketMap sockets; + + { + auto g(Guard(Lock_)); + Sockets_.swap(sockets); + } + } + + void TSocketPool::Drain(const TDuration timeout) { + const TInstant now = TInstant::Now(); + TVector<THolder<TSocketHandle>> sockets; + + { + auto g(Guard(Lock_)); + for (auto si = Sockets_.begin(); si != Sockets_.end();) { + if (si->second.Touched + timeout < now) { + sockets.push_back(std::move(si->second.Socket)); + Sockets_.erase(si++); + } else { + ++si; + } + } + } + } + + THolder<TSocketPool::TSocketHandle> TSocketPool::GetSocket(const TString& host, const TIpPort port) { + THolder<TSocketPool::TSocketHandle> socket; + + { + auto g(Guard(Lock_)); + auto si = Sockets_.find(std::make_pair(host, port)); + if (si != Sockets_.end()) { + socket = std::move(si->second.Socket); + Sockets_.erase(si); + } + } + + return socket; + } + + void TSocketPool::ReturnSocket(const TString& host, const TIpPort port, THolder<TSocketHandle> socket) { + TConnection conn; + + conn.Socket = std::move(socket); + conn.Touched = TInstant::Now(); + + { + auto g(Guard(Lock_)); + Sockets_.emplace(std::make_pair(host, port), std::move(conn)); + } + } + +} diff --git a/library/cpp/http/client/fetch/pool.h b/library/cpp/http/client/fetch/pool.h new file mode 100644 index 0000000000..73c3eda0c6 --- /dev/null +++ b/library/cpp/http/client/fetch/pool.h @@ -0,0 +1,41 @@ +#pragma once + +#include "cosocket.h" + +#include <library/cpp/http/client/ssl/sslsock.h> + +#include <util/generic/hash_multi_map.h> +#include <util/generic/ptr.h> +#include <util/system/mutex.h> + +namespace NHttpFetcher { + class TSocketPool { + public: + using TSocketHandle = TSslSocketHandler<TCoSocketHandler, TSslSocketBase::TFakeLogger>; + + public: + /// Closes all sockets. + void Clear(); + + /// Closes all sockets that have been opened too long. + void Drain(const TDuration timeout); + + /// Returns socket for the given endpoint if available. + THolder<TSocketHandle> GetSocket(const TString& host, const TIpPort port); + + /// Puts socket to the pool. + void ReturnSocket(const TString& host, const TIpPort port, THolder<TSocketHandle> socket); + + private: + struct TConnection { + THolder<TSocketHandle> Socket; + TInstant Touched; + }; + + using TSocketMap = THashMultiMap<std::pair<TString, TIpPort>, TConnection>; + + TMutex Lock_; + TSocketMap Sockets_; + }; + +} |