aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-03-10 12:57:24 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-03-10 12:57:24 +0300
commit4cee9b5ad6d8fef0473d919c885dd4dbd4e1f528 (patch)
treee40494d07d036948c4ddebabcea35867f9e19ab6
parentd86267870580f2f60993cedf63d494cdcc98ec5e (diff)
downloadydb-4cee9b5ad6d8fef0473d919c885dd4dbd4e1f528.tar.gz
YQ-727 HTTP Gateway interface for dowload as stream. Draft.
ref:fd5120c69f485bea27a5130e17a32df82cccacbe
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp150
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h10
2 files changed, 123 insertions, 37 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 2a0a4649c8..8c3f9557a5 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -17,15 +17,13 @@ public:
using TPtr = std::shared_ptr<TEasyCurl>;
using TWeakPtr = std::weak_ptr<TEasyCurl>;
- TEasyCurl(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback)
- : ExpectedSize(expectedSize), Handle(curl_easy_init()), Data(std::move(data)), Input(Data), Output(Buffer)
+ TEasyCurl(TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, bool withData)
+ : ExpectedSize(expectedSize), Handle(curl_easy_init())
{
- Output.Reserve(expectedSize);
-
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
- curl_easy_setopt(Handle, CURLOPT_POST, Data.empty() ? 0L : 1L);
+ curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L);
curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
- curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(&Output));
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this));
curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway");
curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
@@ -35,25 +33,19 @@ public:
curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Headers);
}
- if (!Data.empty()) {
+ if (withData) {
curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
- curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(&Input));
+ curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this));
}
-
- Callbacks.emplace(std::move(callback));
}
- ~TEasyCurl() {
+ virtual ~TEasyCurl() {
curl_easy_cleanup(Handle);
if (Headers) {
curl_slist_free_all(Headers);
}
}
- static TPtr Make(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) {
- return std::make_shared<TEasyCurl>(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
- }
-
std::size_t GetExpectedSize() const {
return ExpectedSize;
}
@@ -63,7 +55,43 @@ public:
}
// return true if callback successfully added to this work
- bool AddCallback(IHTTPGateway::TOnResult callback) {
+ virtual bool AddCallback(IHTTPGateway::TOnResult callback) = 0;
+ virtual void Fail(const TIssue& error) = 0;
+ virtual void Done(CURLcode result) = 0;
+
+ virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0;
+ virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0;
+
+private:
+ static size_t
+ WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) {
+ return static_cast<TEasyCurl*>(userp)->Write(contents, size, nmemb);
+ };
+
+ static size_t
+ ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) {
+ return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb);
+ };
+
+ const std::size_t ExpectedSize;
+ CURL *const Handle;
+ curl_slist* Headers = nullptr;
+};
+
+class TEasyCurlBuffer : public TEasyCurl {
+public:
+ TEasyCurlBuffer(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback)
+ : TEasyCurl(url, headers, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer)
+ {
+ Output.Reserve(expectedSize);
+ Callbacks.emplace(std::move(callback));
+ }
+
+ static TPtr Make(TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) {
+ return std::make_shared<TEasyCurlBuffer>(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
+ }
+private:
+ bool AddCallback(IHTTPGateway::TOnResult callback) final {
const std::unique_lock lock(SyncCallbacks);
if (Callbacks.empty())
return false;
@@ -71,23 +99,20 @@ public:
return true;
}
- size_t Fail(const TIssue& error) {
+ void Fail(const TIssue& error) final {
TIssues issues{error};
const std::unique_lock lock(SyncCallbacks);
- size_t callbacksSize = Callbacks.size();
while (!Callbacks.empty()) {
Callbacks.top()(issues);
Callbacks.pop();
}
- return callbacksSize;
}
- size_t Done(CURLcode result) {
+ void Done(CURLcode result) final {
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
const std::unique_lock lock(SyncCallbacks);
- size_t callbacksSize = Callbacks.size();
while (!Callbacks.empty()) {
if (1U == Callbacks.size())
Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer)));
@@ -95,24 +120,18 @@ public:
Callbacks.top()(IHTTPGateway::TContent(Buffer));
Callbacks.pop();
}
- return callbacksSize;
}
-private:
- static size_t
- WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) {
+
+ size_t Write(void* contents, size_t size, size_t nmemb) final {
const auto realsize = size * nmemb;
- static_cast<TStringOutput*>(userp)->Write(contents, realsize);
+ Output.Write(contents, realsize);
return realsize;
};
- static size_t
- ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) {
- return static_cast<TStringInput*>(userp)->Read(buffer, size * nmemb);
+ size_t Read(char *buffer, size_t size, size_t nmemb) final {
+ return Input.Read(buffer, size * nmemb);
};
- const std::size_t ExpectedSize;
- CURL *const Handle;
- curl_slist* Headers = nullptr;
const TString Data;
TString Buffer;
TStringInput Input;
@@ -122,6 +141,42 @@ private:
std::stack<IHTTPGateway::TOnResult> Callbacks;
};
+class TEasyCurlStream : public TEasyCurl {
+public:
+ TEasyCurlStream(TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish)
+ : TEasyCurl(url, headers, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish))
+ {
+ }
+
+ static TPtr Make(TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) {
+ return std::make_shared<TEasyCurlStream>(std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish));
+ }
+private:
+ bool AddCallback(IHTTPGateway::TOnResult) final { return false; }
+
+ void Fail(const TIssue& error) final {
+ return OnFinish(TIssues{error});
+ }
+
+ void Done(CURLcode result) final {
+ if (CURLE_OK != result)
+ return Fail(TIssue(curl_easy_strerror(result)));
+
+ return OnFinish(std::nullopt);
+ }
+
+ size_t Write(void* contents, size_t size, size_t nmemb) final {
+ const auto realsize = size * nmemb;
+ OnNewData(IHTTPGateway::TContent(TString(static_cast<char*>(contents), realsize)));
+ return realsize;
+ };
+
+ size_t Read(char*, size_t, size_t) final { return 0ULL; }
+
+ const IHTTPGateway::TOnNewDataPart OnNewData;
+ const IHTTPGateway::TOnDowloadFinsh OnFinish;
+};
+
using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
class TKeyHash {
@@ -269,7 +324,7 @@ private:
Requests.clear();
}
if (!isRetry && easy) {
- InFlight->Sub(easy->Done(result));
+ easy->Done(result);
}
}
@@ -291,7 +346,7 @@ private:
const TIssue error(curl_multi_strerror(result));
while (!works.empty()) {
- InFlight->Sub(works.top()->Fail(error));
+ works.top()->Fail(error);
works.pop();
}
}
@@ -305,7 +360,6 @@ private:
IRetryPolicy<long>::TPtr retryPolicy) final
{
Rps->Inc();
- InFlight->Inc();
const std::unique_lock lock(Sync);
auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)];
@@ -314,13 +368,26 @@ private:
if (easy->AddCallback(std::move(callback)))
return;
- auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
+ InFlight->Inc();
+ auto easy = TEasyCurlBuffer::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
entry = easy;
Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState()));
Await.emplace(std::move(easy));
Wakeup(expectedSize);
}
+ void Download(
+ TString url,
+ THeaders headers,
+ std::size_t expectedSize,
+ TOnNewDataPart onNewData,
+ TOnDowloadFinsh onFinish) final
+ {
+ auto easy = TEasyCurlStream::Make(std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish));
+ Await.emplace(std::move(easy));
+ Wakeup(expectedSize);
+ }
+
void OnRetry(TEasyCurl::TPtr easy) {
const std::unique_lock lock(Sync);
const std::size_t expectedSize = easy->GetExpectedSize();
@@ -427,10 +494,19 @@ private:
if (const auto& easy = entry.first.lock())
if (easy->AddCallback(std::move(callback)))
return;
- auto easy = TEasyCurl::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
+ auto easy = TEasyCurlBuffer::Make(std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
entry = std::make_pair(TEasyCurl::TWeakPtr(easy), std::thread(&THTTPEasyGateway::Perform, weak_from_this(), easy));
}
+ virtual void Download(
+ TString ,
+ THeaders ,
+ std::size_t ,
+ TOnNewDataPart ,
+ TOnDowloadFinsh ) {
+
+ }
+
void Done(const TEasyCurl::TPtr& easy, CURLcode result) {
const std::unique_lock lock(Sync);
easy->Done(result);
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 506b7d774e..991f0b2768 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -58,6 +58,16 @@ public:
TString data = {},
IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()
) = 0;
+
+ using TOnNewDataPart = std::function<void(TContent&&)>;
+ using TOnDowloadFinsh = std::function<void(std::optional<TIssues>)>;
+
+ virtual void Download(
+ TString url,
+ THeaders headers,
+ std::size_t expectedSize,
+ TOnNewDataPart onNewData,
+ TOnDowloadFinsh onFinish) = 0;
};
}