aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-09-09 16:07:32 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-09-09 16:07:32 +0300
commitc9d949de0a10fb0ac657d5cf8046f1729cdbe282 (patch)
treea4d681afee19c44c257bd5004d64921517e932c2
parent42578558983241f5b879a6453387a47c4ada2d27 (diff)
downloadydb-c9d949de0a10fb0ac657d5cf8046f1729cdbe282.tar.gz
Fix SizeLimit on streaming download.
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp44
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp9
2 files changed, 24 insertions, 29 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index d54197b825c..56fc30dad17 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -81,8 +81,8 @@ public:
PUT
};
- TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 sizeLimit = 0, const TCurlInitConfig& config = TCurlInitConfig())
- : Headers(headers), Method(method), Offset(offset), BodySize(bodySize), SizeLimit(sizeLimit), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url)
+ TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t sizeLimit = 0, size_t bodySize = 0, const TCurlInitConfig& config = TCurlInitConfig())
+ : Headers(headers), Method(method), Offset(offset), SizeLimit(sizeLimit), BodySize(bodySize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url)
{
InitHandles();
Counter->Inc();
@@ -132,12 +132,7 @@ public:
std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2)));
curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, CurlHeaders);
}
- TStringBuilder byteRange;
- byteRange << Offset << "-";
- if (SizeLimit) {
- byteRange << Offset + SizeLimit - 1;
- }
- curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
+
curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this));
@@ -154,6 +149,7 @@ public:
curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this));
}
+ SkipTo(0ULL);
}
void FreeHandles() {
@@ -177,9 +173,17 @@ public:
virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0;
virtual size_t WriteHeader(void* contents, size_t size, size_t nmemb) = 0;
virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0;
+
+ size_t GetSizeLimit() const { return SizeLimit; }
protected:
void SkipTo(size_t offset) const {
- curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset + offset) += '-').c_str());
+ if (offset || Offset || SizeLimit) {
+ TStringBuilder byteRange;
+ byteRange << Offset + offset << '-';
+ if (SizeLimit)
+ byteRange << Offset + SizeLimit - 1;
+ curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
+ }
}
private:
static size_t
@@ -208,8 +212,8 @@ private:
const IHTTPGateway::THeaders Headers;
const EMethod Method;
const size_t Offset;
+ const size_t SizeLimit;
const size_t BodySize;
- const ui64 SizeLimit;
CURL* Handle = nullptr;
curl_slist* CurlHeaders = nullptr;
const ::NMonitoring::TDynamicCounters::TCounterPtr Counter;
@@ -226,9 +230,9 @@ public:
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, sizeLimit, data.size(), std::move(config)), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState))
{
- Output.Reserve(SizeLimit);
+ Output.Reserve(sizeLimit);
Callbacks.emplace(std::move(callback));
}
@@ -236,10 +240,6 @@ public:
return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config));
}
- size_t GetSizeLimit() const {
- return SizeLimit;
- }
-
// return true if callback successfully added to this work
bool AddCallback(IHTTPGateway::TOnResult callback) {
const std::unique_lock lock(SyncCallbacks);
@@ -258,8 +258,9 @@ public:
void Reset() {
Buffer.clear();
+ Header.clear();
TStringOutput(Buffer).Swap(Output);
- Output.Reserve(SizeLimit);
+ TStringOutput(Header).Swap(HeaderOutput);
TStringInput(Data).Swap(Input);
FreeHandles();
InitHandles();
@@ -304,7 +305,6 @@ private:
return Input.Read(buffer, size * nmemb);
}
- const size_t SizeLimit;
const TString Data;
TString Buffer, Header;
TStringInput Input;
@@ -331,9 +331,8 @@ public:
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, sizeLimit, 0ULL, std::move(config))
, OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
- , Offset(offset), SizeLimit(sizeLimit)
{}
static TPtr Make(
@@ -368,7 +367,7 @@ public:
}
if (Working != Counter->load() < buffersSize) {
- if (!Working && SizeLimit && Offset + Position >= SizeLimit) {
+ if (const auto limit = GetSizeLimit(); !Working && limit && Position >= limit) {
OnFinish(TIssues());
return EAction::Drop;
}
@@ -424,8 +423,6 @@ private:
size_t Position = 0ULL;
bool Cancelled = false;
bool StreamStarted = false;
- size_t Offset = 0;
- size_t SizeLimit = 0;
};
using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
@@ -551,7 +548,6 @@ private:
#ifdef PROFILE_MEMORY_ALLOCATIONS
NProfiling::SetThreadAllocTag(NProfiling::MakeTag("HTTP_PERFORM"));
#endif
-
OutputSize.store(0ULL);
for (size_t handlers = 0U; !IsStopped;) {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 92e22875043..dcc55359817 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -272,15 +272,13 @@ struct TRetryStuff {
TString url,
const IHTTPGateway::THeaders& headers,
std::size_t sizeLimit
- ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), SizeLimit(sizeLimit), Offset(0U), RetryState(GetHTTPDefaultRetryPolicy()->CreateRetryState())
+ ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), Offset(0U), SizeLimit(sizeLimit), RetryState(GetHTTPDefaultRetryPolicy()->CreateRetryState())
{}
const IHTTPGateway::TPtr Gateway;
const TString Url;
const IHTTPGateway::THeaders Headers;
- const std::size_t SizeLimit;
-
- std::size_t Offset = 0U;
+ std::size_t Offset, SizeLimit;
const IRetryPolicy<long>::IRetryState::TPtr RetryState;
IHTTPGateway::TCancelHook CancelHook;
TMaybe<TDuration> NextRetryDelay;
@@ -344,6 +342,7 @@ public:
if (200L == HttpResponseCode || 206L == HttpResponseCode) {
value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
RetryStuff->Offset += value.size();
+ RetryStuff->SizeLimit -= value.size();
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
} else if (HttpResponseCode && !RetryStuff->NextRetryDelay) {
if (ErrorText.size() < 256_KB)
@@ -485,7 +484,7 @@ private:
}
}
- if (retryStuff->NextRetryDelay && retryStuff->Offset < retryStuff->SizeLimit)
+ if (retryStuff->NextRetryDelay && retryStuff->SizeLimit > 0ULL)
actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
else
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished));