aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-04-11 20:21:13 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-04-11 20:21:13 +0300
commit77bee32506fb84f26f504842bf8ec90860ba741c (patch)
tree44c38d93358e3a171b071800845ca2b729e5d778
parente445e8e0dc6f3796a1c8a288a302fa04f3ee7048 (diff)
downloadydb-77bee32506fb84f26f504842bf8ec90860ba741c.tar.gz
YQ-727 Interface for download from offset.
ref:c1ad03a5abb6458fd8e6a4bae72f7bc549831dfd
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp3
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp40
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp2
4 files changed, 27 insertions, 19 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 194bf2c6e75..fb7574fe391 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -60,6 +60,7 @@ public:
TString ,
IHTTPGateway::THeaders ,
std::size_t ,
+ std::size_t ,
IHTTPGateway::TOnNewDataPart ,
IHTTPGateway::TOnDowloadFinsh ) {
}
@@ -89,4 +90,4 @@ IHTTPMockGateway::TPtr IHTTPMockGateway::Make() {
return std::make_shared<THTTPMockGateway>();
}
-} \ No newline at end of file
+}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index e142218a51b..324aacd7653 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -18,8 +18,8 @@ public:
using TPtr = std::shared_ptr<TEasyCurl>;
using TWeakPtr = std::weak_ptr<TEasyCurl>;
- TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, bool withData)
- : ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter)
+ TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData)
+ : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter)
{
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L);
@@ -34,6 +34,10 @@ public:
curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Headers);
}
+ if (Offset) {
+ curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str());
+ }
+
if (withData) {
curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this));
@@ -76,6 +80,7 @@ private:
return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb);
};
+ const std::size_t Offset;
const std::size_t ExpectedSize;
CURL *const Handle;
curl_slist* Headers = nullptr;
@@ -84,15 +89,15 @@ private:
class TEasyCurlBuffer : public TEasyCurl {
public:
- TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback)
- : TEasyCurl(counter, url, headers, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer)
+ TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback)
+ : TEasyCurl(counter, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer)
{
Output.Reserve(expectedSize);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback) {
- return std::make_shared<TEasyCurlBuffer>(counter, std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) {
+ return std::make_shared<TEasyCurlBuffer>(counter, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
}
private:
bool AddCallback(IHTTPGateway::TOnResult callback) final {
@@ -150,13 +155,13 @@ private:
class TEasyCurlStream : public TEasyCurl {
public:
- TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish)
- : TEasyCurl(counter, url, headers, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish))
+ TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish)
+ : TEasyCurl(counter, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish))
{
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) {
- return std::make_shared<TEasyCurlStream>(counter, std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish));
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDowloadFinsh onFinish) {
+ return std::make_shared<TEasyCurlStream>(counter, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish));
}
private:
bool AddCallback(IHTTPGateway::TOnResult) final { return false; }
@@ -184,16 +189,16 @@ private:
const IHTTPGateway::TOnDowloadFinsh OnFinish;
};
-using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
+using TKeyType = std::tuple<TString, std::size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
class TKeyHash {
public:
TKeyHash() : Hash(), HashPtr() {}
size_t operator()(const TKeyType& key) const {
- const auto& headers = std::get<1U>(key);
- auto initHash = CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key)));
- initHash = CombineHashes(HashPtr(std::get<3U>(key)), initHash);
+ const auto& headers = std::get<2U>(key);
+ auto initHash = CombineHashes(CombineHashes(Hash(std::get<0U>(key)), std::get<1U>(key)), Hash(std::get<3U>(key)));
+ initHash = CombineHashes(HashPtr(std::get<4U>(key)), initHash);
return std::accumulate(headers.cbegin(), headers.cend(), initHash,
[this](size_t hash, const TString& item) { return CombineHashes(hash, Hash(item)); });
}
@@ -369,13 +374,13 @@ private:
Rps->Inc();
const std::unique_lock lock(Sync);
- auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)];
+ auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)];
StraightInFlight->Set(Requests.size());
if (const auto& easy = entry.lock())
if (easy->AddCallback(callback))
return;
- auto easy = TEasyCurlBuffer::Make(InFlight, std::move(url), std::move(data), std::move(headers), expectedSize, std::move(callback));
+ auto easy = TEasyCurlBuffer::Make(InFlight, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback));
entry = easy;
Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState()));
Await.emplace(std::move(easy));
@@ -385,11 +390,12 @@ private:
void Download(
TString url,
THeaders headers,
+ std::size_t offset,
std::size_t expectedSize,
TOnNewDataPart onNewData,
TOnDowloadFinsh onFinish) final
{
- auto easy = TEasyCurlStream::Make(InFlight, std::move(url), std::move(headers), expectedSize, std::move(onNewData), std::move(onFinish));
+ auto easy = TEasyCurlStream::Make(InFlight, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
Await.emplace(std::move(easy));
Wakeup(expectedSize);
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index f5240a26cf5..c21ef4cd1bb 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -69,6 +69,7 @@ public:
virtual void Download(
TString url,
THeaders headers,
+ std::size_t offset,
std::size_t expectedSize,
TOnNewDataPart onNewData,
TOnDowloadFinsh onFinish) = 0;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index f52ef28d860..717d126796e 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -392,7 +392,7 @@ private:
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) {
Gateway->Download(Url + Path,
- Headers, ExpectedSize,
+ Headers, 0ULL, ExpectedSize,
std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1),
std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1));