aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-09-07 15:55:17 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-09-07 15:55:17 +0300
commit52ee0ba26eaa8570fac91f3261d2e206560e9a76 (patch)
tree5e28e28308ce6dbba0b07f7f03e82021d0f630ef
parent2513074d56e5f1f8fb8bd91b22b78d5415a1b3c4 (diff)
downloadydb-52ee0ba26eaa8570fac91f3261d2e206560e9a76.tar.gz
Read headers in seprate result field.
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp42
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h5
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp2
3 files changed, 36 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 35054489a4f..25994c1859b 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -134,8 +134,14 @@ public:
byteRange << Offset + SizeLimit - 1;
}
curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
- curl_easy_setopt(Handle, EMethod::PUT == Method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
- curl_easy_setopt(Handle, EMethod::PUT == Method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this));
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this));
+
+ if (EMethod::PUT == Method) {
+ curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, &WriteHeaderCallback);
+ curl_easy_setopt(Handle, CURLOPT_HEADERDATA, static_cast<void*>(this));
+ }
+
if (Method == EMethod::POST) {
curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, BodySize);
}
@@ -165,6 +171,7 @@ public:
virtual void Done(CURLcode result, long httpResponseCode) = 0;
virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0;
+ virtual size_t WriteHeader(void* contents, size_t size, size_t nmemb) = 0;
virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0;
protected:
void SkipTo(size_t offset) const {
@@ -180,6 +187,13 @@ private:
};
static size_t
+ WriteHeaderCallback(void* contents, size_t size, size_t nmemb, void* userp) {
+ const auto self = static_cast<TEasyCurl*>(userp);
+ const auto res = self->WriteHeader(contents, size, nmemb);
+ return res;
+ };
+
+ static size_t
ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) {
const auto self = static_cast<TEasyCurl*>(userp);
const auto res = self->Read(buffer, size, nmemb);
@@ -208,7 +222,7 @@ public:
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState))
{
Output.Reserve(SizeLimit);
Callbacks.emplace(std::move(callback));
@@ -263,9 +277,9 @@ private:
const std::unique_lock lock(SyncCallbacks);
while (!Callbacks.empty()) {
if (1U == Callbacks.size())
- Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer), httpResponseCode));
+ Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer), httpResponseCode, std::move(Header)));
else
- Callbacks.top()(IHTTPGateway::TContent(Buffer, httpResponseCode));
+ Callbacks.top()(IHTTPGateway::TContent(Buffer, httpResponseCode, Header));
Callbacks.pop();
}
}
@@ -276,15 +290,21 @@ private:
return realsize;
}
+ size_t WriteHeader(void* contents, size_t size, size_t nmemb) final {
+ const auto realsize = size * nmemb;
+ HeaderOutput.Write(contents, realsize);
+ return realsize;
+ }
+
size_t Read(char *buffer, size_t size, size_t nmemb) final {
return Input.Read(buffer, size * nmemb);
}
const size_t SizeLimit;
const TString Data;
- TString Buffer;
+ TString Buffer, Header;
TStringInput Input;
- TStringOutput Output;
+ TStringOutput Output, HeaderOutput;
std::mutex SyncCallbacks;
std::stack<IHTTPGateway::TOnResult> Callbacks;
@@ -388,6 +408,7 @@ private:
return realsize;
}
+ size_t WriteHeader(void*, size_t, size_t) final { return 0ULL; }
size_t Read(char*, size_t, size_t) final { return 0ULL; }
const IHTTPGateway::TOnDownloadStart OnStart;
@@ -791,17 +812,18 @@ TString IHTTPGateway::TContentBase::Extract() {
return std::move(*this);
}
-IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode)
+IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode, TString&& headers)
: TContentBase(std::move(data))
+ , Headers(std::move(headers))
, HttpResponseCode(httpResponseCode)
{}
-IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode)
+IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, const TString& headers)
: TContentBase(data)
+ , Headers(headers)
, HttpResponseCode(httpResponseCode)
{}
-
IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter)
: TContentBase(std::move(data)), Counter(counter)
{
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index e26861cca33..acd74b265a1 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -49,14 +49,15 @@ public:
class TContent : public TContentBase {
friend class TEasyCurl;
public:
- TContent(TString&& data, long httpResponseCode = 0LL);
- TContent(const TString& data, long httpResponseCode = 0LL);
+ TContent(TString&& data, long httpResponseCode = 0LL, TString&& headers = {});
+ TContent(const TString& data, long httpResponseCode = 0LL, const TString& headers = {});
TContent(TContent&& src) = default;
TContent& operator=(TContent&& src) = default;
using TContentBase::Extract;
public:
+ TString Headers;
long HttpResponseCode;
};
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index dc403589bb5..1f2475eaa4c 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -195,7 +195,7 @@ private:
static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, IHTTPGateway::TResult&& response) {
switch (response.index()) {
case 0U: {
- const auto str = std::get<IHTTPGateway::TContent>(std::move(response)).Extract();
+ const auto& str = std::get<IHTTPGateway::TContent>(std::move(response)).Headers;
if (const auto p = str.find("etag: \""); p != TString::npos) {
if (const auto p1 = p + 7, p2 = str.find("\"", p1); p2 != TString::npos) {