diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-04-11 20:21:13 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-04-11 20:21:13 +0300 |
commit | 77bee32506fb84f26f504842bf8ec90860ba741c (patch) | |
tree | 44c38d93358e3a171b071800845ca2b729e5d778 | |
parent | e445e8e0dc6f3796a1c8a288a302fa04f3ee7048 (diff) | |
download | ydb-77bee32506fb84f26f504842bf8ec90860ba741c.tar.gz |
YQ-727 Interface for download from offset.
ref:c1ad03a5abb6458fd8e6a4bae72f7bc549831dfd
4 files changed, 27 insertions, 19 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 194bf2c6e75..fb7574fe391 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -60,6 +60,7 @@ public: TString , IHTTPGateway::THeaders , std::size_t , + std::size_t , IHTTPGateway::TOnNewDataPart , IHTTPGateway::TOnDowloadFinsh ) { } @@ -89,4 +90,4 @@ IHTTPMockGateway::TPtr IHTTPMockGateway::Make() { return std::make_shared<THTTPMockGateway>(); } -}
\ No newline at end of file +} diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index e142218a51b..324aacd7653 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -18,8 +18,8 @@ public: using TPtr = std::shared_ptr<TEasyCurl>; using TWeakPtr = std::weak_ptr<TEasyCurl>; - TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, bool withData) - : ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter) + TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData) + : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter) { curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L); @@ -34,6 +34,10 @@ public: curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Headers); } + if (Offset) { + curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str()); + } + if (withData) { curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this)); @@ -76,6 +80,7 @@ private: return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb); }; + const std::size_t Offset; const std::size_t ExpectedSize; CURL *const Handle; curl_slist* Headers = nullptr; @@ -84,15 +89,15 @@ private: class TEasyCurlBuffer : public TEasyCurl { public: - TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) - : TEasyCurl(counter, url, headers, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer) + TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) + : TEasyCurl(counter, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer) { Output.Reserve(expectedSize); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { - return std::make_shared<TEasyCurlBuffer>(counter, std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { + return std::make_shared<TEasyCurlBuffer>(counter, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); } private: bool AddCallback(IHTTPGateway::TOnResult callback) final { @@ -150,13 +155,13 @@ private: class TEasyCurlStream : public TEasyCurl { public: - TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) - : TEasyCurl(counter, url, headers, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)) + TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) + : TEasyCurl(counter, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)) { } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) { - return std::make_shared<TEasyCurlStream>(counter, std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish)); + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) { + return std::make_shared<TEasyCurlStream>(counter, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish)); } private: bool AddCallback(IHTTPGateway::TOnResult) final { return false; } @@ -184,16 +189,16 @@ private: const IHTTPGateway::TOnDowloadFinsh OnFinish; }; -using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; +using TKeyType = std::tuple<TString, std::size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; class TKeyHash { public: TKeyHash() : Hash(), HashPtr() {} size_t operator()(const TKeyType& key) const { - const auto& headers = std::get<1U>(key); - auto initHash = CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key))); - initHash = CombineHashes(HashPtr(std::get<3U>(key)), initHash); + const auto& headers = std::get<2U>(key); + auto initHash = CombineHashes(CombineHashes(Hash(std::get<0U>(key)), std::get<1U>(key)), Hash(std::get<3U>(key))); + initHash = CombineHashes(HashPtr(std::get<4U>(key)), initHash); return std::accumulate(headers.cbegin(), headers.cend(), initHash, [this](size_t hash, const TString& item) { return CombineHashes(hash, Hash(item)); }); } @@ -369,13 +374,13 @@ private: Rps->Inc(); const std::unique_lock lock(Sync); - auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)]; + auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; StraightInFlight->Set(Requests.size()); if (const auto& easy = entry.lock()) if (easy->AddCallback(callback)) return; - auto easy = TEasyCurlBuffer::Make(InFlight, std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback)); + auto easy = TEasyCurlBuffer::Make(InFlight, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback)); entry = easy; Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState())); Await.emplace(std::move(easy)); @@ -385,11 +390,12 @@ private: void Download( TString url, THeaders headers, + std::size_t offset, std::size_t expectedSize, TOnNewDataPart onNewData, TOnDowloadFinsh onFinish) final { - auto easy = TEasyCurlStream::Make(InFlight, std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish)); + auto easy = TEasyCurlStream::Make(InFlight, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); Await.emplace(std::move(easy)); Wakeup(expectedSize); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index f5240a26cf5..c21ef4cd1bb 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -69,6 +69,7 @@ public: virtual void Download( TString url, THeaders headers, + std::size_t offset, std::size_t expectedSize, TOnNewDataPart onNewData, TOnDowloadFinsh onFinish) = 0; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index f52ef28d860..717d126796e 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -392,7 +392,7 @@ private: TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) { Gateway->Download(Url + Path, - Headers, ExpectedSize, + Headers, 0ULL, ExpectedSize, std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1), std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1)); |