diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-07 15:55:17 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-07 15:55:17 +0300 |
commit | 52ee0ba26eaa8570fac91f3261d2e206560e9a76 (patch) | |
tree | 5e28e28308ce6dbba0b07f7f03e82021d0f630ef | |
parent | 2513074d56e5f1f8fb8bd91b22b78d5415a1b3c4 (diff) | |
download | ydb-52ee0ba26eaa8570fac91f3261d2e206560e9a76.tar.gz |
Read headers in seprate result field.
3 files changed, 36 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 35054489a4f..25994c1859b 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -134,8 +134,14 @@ public: byteRange << Offset + SizeLimit - 1; } curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str()); - curl_easy_setopt(Handle, EMethod::PUT == Method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); - curl_easy_setopt(Handle, EMethod::PUT == Method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this)); + curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); + curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this)); + + if (EMethod::PUT == Method) { + curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, &WriteHeaderCallback); + curl_easy_setopt(Handle, CURLOPT_HEADERDATA, static_cast<void*>(this)); + } + if (Method == EMethod::POST) { curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, BodySize); } @@ -165,6 +171,7 @@ public: virtual void Done(CURLcode result, long httpResponseCode) = 0; virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0; + virtual size_t WriteHeader(void* contents, size_t size, size_t nmemb) = 0; virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0; protected: void SkipTo(size_t offset) const { @@ -180,6 +187,13 @@ private: }; static size_t + WriteHeaderCallback(void* contents, size_t size, size_t nmemb, void* userp) { + const auto self = static_cast<TEasyCurl*>(userp); + const auto res = self->WriteHeader(contents, size, nmemb); + return res; + }; + + static size_t ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) { const auto self = static_cast<TEasyCurl*>(userp); const auto res = self->Read(buffer, size, nmemb); @@ -208,7 +222,7 @@ public: using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState)) { Output.Reserve(SizeLimit); Callbacks.emplace(std::move(callback)); @@ -263,9 +277,9 @@ private: const std::unique_lock lock(SyncCallbacks); while (!Callbacks.empty()) { if (1U == Callbacks.size()) - Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer), httpResponseCode)); + Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer), httpResponseCode, std::move(Header))); else - Callbacks.top()(IHTTPGateway::TContent(Buffer, httpResponseCode)); + Callbacks.top()(IHTTPGateway::TContent(Buffer, httpResponseCode, Header)); Callbacks.pop(); } } @@ -276,15 +290,21 @@ private: return realsize; } + size_t WriteHeader(void* contents, size_t size, size_t nmemb) final { + const auto realsize = size * nmemb; + HeaderOutput.Write(contents, realsize); + return realsize; + } + size_t Read(char *buffer, size_t size, size_t nmemb) final { return Input.Read(buffer, size * nmemb); } const size_t SizeLimit; const TString Data; - TString Buffer; + TString Buffer, Header; TStringInput Input; - TStringOutput Output; + TStringOutput Output, HeaderOutput; std::mutex SyncCallbacks; std::stack<IHTTPGateway::TOnResult> Callbacks; @@ -388,6 +408,7 @@ private: return realsize; } + size_t WriteHeader(void*, size_t, size_t) final { return 0ULL; } size_t Read(char*, size_t, size_t) final { return 0ULL; } const IHTTPGateway::TOnDownloadStart OnStart; @@ -791,17 +812,18 @@ TString IHTTPGateway::TContentBase::Extract() { return std::move(*this); } -IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode) +IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode, TString&& headers) : TContentBase(std::move(data)) + , Headers(std::move(headers)) , HttpResponseCode(httpResponseCode) {} -IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode) +IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, const TString& headers) : TContentBase(data) + , Headers(headers) , HttpResponseCode(httpResponseCode) {} - IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter) : TContentBase(std::move(data)), Counter(counter) { diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index e26861cca33..acd74b265a1 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -49,14 +49,15 @@ public: class TContent : public TContentBase { friend class TEasyCurl; public: - TContent(TString&& data, long httpResponseCode = 0LL); - TContent(const TString& data, long httpResponseCode = 0LL); + TContent(TString&& data, long httpResponseCode = 0LL, TString&& headers = {}); + TContent(const TString& data, long httpResponseCode = 0LL, const TString& headers = {}); TContent(TContent&& src) = default; TContent& operator=(TContent&& src) = default; using TContentBase::Extract; public: + TString Headers; long HttpResponseCode; }; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index dc403589bb5..1f2475eaa4c 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -195,7 +195,7 @@ private: static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, IHTTPGateway::TResult&& response) { switch (response.index()) { case 0U: { - const auto str = std::get<IHTTPGateway::TContent>(std::move(response)).Extract(); + const auto& str = std::get<IHTTPGateway::TContent>(std::move(response)).Headers; if (const auto p = str.find("etag: \""); p != TString::npos) { if (const auto p1 = p + 7, p2 = str.find("\"", p1); p2 != TString::npos) { |