diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-03-10 12:57:24 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-03-10 12:57:24 +0300 |
commit | 4cee9b5ad6d8fef0473d919c885dd4dbd4e1f528 (patch) | |
tree | e40494d07d036948c4ddebabcea35867f9e19ab6 | |
parent | d86267870580f2f60993cedf63d494cdcc98ec5e (diff) | |
download | ydb-4cee9b5ad6d8fef0473d919c885dd4dbd4e1f528.tar.gz |
YQ-727 HTTP Gateway interface for dowload as stream. Draft.
ref:fd5120c69f485bea27a5130e17a32df82cccacbe
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp | 150 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h | 10 |
2 files changed, 123 insertions, 37 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 2a0a4649c8..8c3f9557a5 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -17,15 +17,13 @@ public: using TPtr = std::shared_ptr<TEasyCurl>; using TWeakPtr = std::weak_ptr<TEasyCurl>; - TEasyCurl(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) - : ExpectedSize(expectedSize), Handle(curl_easy_init()), Data(std::move(data)), Input(Data), Output(Buffer) + TEasyCurl(TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, bool withData) + : ExpectedSize(expectedSize), Handle(curl_easy_init()) { - Output.Reserve(expectedSize); - curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); - curl_easy_setopt(Handle, CURLOPT_POST, Data.empty() ? 0L : 1L); + curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L); curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); - curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(&Output)); + curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this)); curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway"); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); @@ -35,25 +33,19 @@ public: curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Headers); } - if (!Data.empty()) { + if (withData) { curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback); - curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(&Input)); + curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this)); } - - Callbacks.emplace(std::move(callback)); } - ~TEasyCurl() { + virtual ~TEasyCurl() { curl_easy_cleanup(Handle); if (Headers) { curl_slist_free_all(Headers); } } - static TPtr Make(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { - return std::make_shared<TEasyCurl>(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); - } - std::size_t GetExpectedSize() const { return ExpectedSize; } @@ -63,7 +55,43 @@ public: } // return true if callback successfully added to this work - bool AddCallback(IHTTPGateway::TOnResult callback) { + virtual bool AddCallback(IHTTPGateway::TOnResult callback) = 0; + virtual void Fail(const TIssue& error) = 0; + virtual void Done(CURLcode result) = 0; + + virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0; + virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0; + +private: + static size_t + WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) { + return static_cast<TEasyCurl*>(userp)->Write(contents, size, nmemb); + }; + + static size_t + ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) { + return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb); + }; + + const std::size_t ExpectedSize; + CURL *const Handle; + curl_slist* Headers = nullptr; +}; + +class TEasyCurlBuffer : public TEasyCurl { +public: + TEasyCurlBuffer(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) + : TEasyCurl(url, headers, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer) + { + Output.Reserve(expectedSize); + Callbacks.emplace(std::move(callback)); + } + + static TPtr Make(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { + return std::make_shared<TEasyCurlBuffer>(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); + } +private: + bool AddCallback(IHTTPGateway::TOnResult callback) final { const std::unique_lock lock(SyncCallbacks); if (Callbacks.empty()) return false; @@ -71,23 +99,20 @@ public: return true; } - size_t Fail(const TIssue& error) { + void Fail(const TIssue& error) final { TIssues issues{error}; const std::unique_lock lock(SyncCallbacks); - size_t callbacksSize = Callbacks.size(); while (!Callbacks.empty()) { Callbacks.top()(issues); Callbacks.pop(); } - return callbacksSize; } - size_t Done(CURLcode result) { + void Done(CURLcode result) final { if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); const std::unique_lock lock(SyncCallbacks); - size_t callbacksSize = Callbacks.size(); while (!Callbacks.empty()) { if (1U == Callbacks.size()) Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer))); @@ -95,24 +120,18 @@ public: Callbacks.top()(IHTTPGateway::TContent(Buffer)); Callbacks.pop(); } - return callbacksSize; } -private: - static size_t - WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) { + + size_t Write(void* contents, size_t size, size_t nmemb) final { const auto realsize = size * nmemb; - static_cast<TStringOutput*>(userp)->Write(contents, realsize); + Output.Write(contents, realsize); return realsize; }; - static size_t - ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) { - return static_cast<TStringInput*>(userp)->Read(buffer, size * nmemb); + size_t Read(char *buffer, size_t size, size_t nmemb) final { + return Input.Read(buffer, size * nmemb); }; - const std::size_t ExpectedSize; - CURL *const Handle; - curl_slist* Headers = nullptr; const TString Data; TString Buffer; TStringInput Input; @@ -122,6 +141,42 @@ private: std::stack<IHTTPGateway::TOnResult> Callbacks; }; +class TEasyCurlStream : public TEasyCurl { +public: + TEasyCurlStream(TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) + : TEasyCurl(url, headers, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)) + { + } + + static TPtr Make(TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) { + return std::make_shared<TEasyCurlStream>(std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish)); + } +private: + bool AddCallback(IHTTPGateway::TOnResult) final { return false; } + + void Fail(const TIssue& error) final { + return OnFinish(TIssues{error}); + } + + void Done(CURLcode result) final { + if (CURLE_OK != result) + return Fail(TIssue(curl_easy_strerror(result))); + + return OnFinish(std::nullopt); + } + + size_t Write(void* contents, size_t size, size_t nmemb) final { + const auto realsize = size * nmemb; + OnNewData(IHTTPGateway::TContent(TString(static_cast<char*>(contents), realsize))); + return realsize; + }; + + size_t Read(char*, size_t, size_t) final { return 0ULL; } + + const IHTTPGateway::TOnNewDataPart OnNewData; + const IHTTPGateway::TOnDowloadFinsh OnFinish; +}; + using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; class TKeyHash { @@ -269,7 +324,7 @@ private: Requests.clear(); } if (!isRetry && easy) { - InFlight->Sub(easy->Done(result)); + easy->Done(result); } } @@ -291,7 +346,7 @@ private: const TIssue error(curl_multi_strerror(result)); while (!works.empty()) { - InFlight->Sub(works.top()->Fail(error)); + works.top()->Fail(error); works.pop(); } } @@ -305,7 +360,6 @@ private: IRetryPolicy<long>::TPtr retryPolicy) final { Rps->Inc(); - InFlight->Inc(); const std::unique_lock lock(Sync); auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)]; @@ -314,13 +368,26 @@ private: if (easy->AddCallback(std::move(callback))) return; - auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); + InFlight->Inc(); + auto easy = TEasyCurlBuffer::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); entry = easy; Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState())); Await.emplace(std::move(easy)); Wakeup(expectedSize); } + void Download( + TString url, + THeaders headers, + std::size_t expectedSize, + TOnNewDataPart onNewData, + TOnDowloadFinsh onFinish) final + { + auto easy = TEasyCurlStream::Make(std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish)); + Await.emplace(std::move(easy)); + Wakeup(expectedSize); + } + void OnRetry(TEasyCurl::TPtr easy) { const std::unique_lock lock(Sync); const std::size_t expectedSize = easy->GetExpectedSize(); @@ -427,10 +494,19 @@ private: if (const auto& easy = entry.first.lock()) if (easy->AddCallback(std::move(callback))) return; - auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); + auto easy = TEasyCurlBuffer::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); entry = std::make_pair(TEasyCurl::TWeakPtr(easy), std::thread(&THTTPEasyGateway::Perform, weak_from_this(), easy)); } + virtual void Download( + TString , + THeaders , + std::size_t , + TOnNewDataPart , + TOnDowloadFinsh ) { + + } + void Done(const TEasyCurl::TPtr& easy, CURLcode result) { const std::unique_lock lock(Sync); easy->Done(result); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 506b7d774e..991f0b2768 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -58,6 +58,16 @@ public: TString data = {}, IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy() ) = 0; + + using TOnNewDataPart = std::function<void(TContent&&)>; + using TOnDowloadFinsh = std::function<void(std::optional<TIssues>)>; + + virtual void Download( + TString url, + THeaders headers, + std::size_t expectedSize, + TOnNewDataPart onNewData, + TOnDowloadFinsh onFinish) = 0; }; } |