diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-09 16:07:32 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-09 16:07:32 +0300 |
commit | c9d949de0a10fb0ac657d5cf8046f1729cdbe282 (patch) | |
tree | a4d681afee19c44c257bd5004d64921517e932c2 | |
parent | 42578558983241f5b879a6453387a47c4ada2d27 (diff) | |
download | ydb-c9d949de0a10fb0ac657d5cf8046f1729cdbe282.tar.gz |
Fix SizeLimit on streaming download.
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp | 44 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 9 |
2 files changed, 24 insertions, 29 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index d54197b825c..56fc30dad17 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -81,8 +81,8 @@ public: PUT }; - TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 sizeLimit = 0, const TCurlInitConfig& config = TCurlInitConfig()) - : Headers(headers), Method(method), Offset(offset), BodySize(bodySize), SizeLimit(sizeLimit), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url) + TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t sizeLimit = 0, size_t bodySize = 0, const TCurlInitConfig& config = TCurlInitConfig()) + : Headers(headers), Method(method), Offset(offset), SizeLimit(sizeLimit), BodySize(bodySize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url) { InitHandles(); Counter->Inc(); @@ -132,12 +132,7 @@ public: std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2))); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, CurlHeaders); } - TStringBuilder byteRange; - byteRange << Offset << "-"; - if (SizeLimit) { - byteRange << Offset + SizeLimit - 1; - } - curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str()); + curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this)); @@ -154,6 +149,7 @@ public: curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this)); } + SkipTo(0ULL); } void FreeHandles() { @@ -177,9 +173,17 @@ public: virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0; virtual size_t WriteHeader(void* contents, size_t size, size_t nmemb) = 0; virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0; + + size_t GetSizeLimit() const { return SizeLimit; } protected: void SkipTo(size_t offset) const { - curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset + offset) += '-').c_str()); + if (offset || Offset || SizeLimit) { + TStringBuilder byteRange; + byteRange << Offset + offset << '-'; + if (SizeLimit) + byteRange << Offset + SizeLimit - 1; + curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str()); + } } private: static size_t @@ -208,8 +212,8 @@ private: const IHTTPGateway::THeaders Headers; const EMethod Method; const size_t Offset; + const size_t SizeLimit; const size_t BodySize; - const ui64 SizeLimit; CURL* Handle = nullptr; curl_slist* CurlHeaders = nullptr; const ::NMonitoring::TDynamicCounters::TCounterPtr Counter; @@ -226,9 +230,9 @@ public: using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, sizeLimit, data.size(), std::move(config)), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState)) { - Output.Reserve(SizeLimit); + Output.Reserve(sizeLimit); Callbacks.emplace(std::move(callback)); } @@ -236,10 +240,6 @@ public: return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config)); } - size_t GetSizeLimit() const { - return SizeLimit; - } - // return true if callback successfully added to this work bool AddCallback(IHTTPGateway::TOnResult callback) { const std::unique_lock lock(SyncCallbacks); @@ -258,8 +258,9 @@ public: void Reset() { Buffer.clear(); + Header.clear(); TStringOutput(Buffer).Swap(Output); - Output.Reserve(SizeLimit); + TStringOutput(Header).Swap(HeaderOutput); TStringInput(Data).Swap(Input); FreeHandles(); InitHandles(); @@ -304,7 +305,6 @@ private: return Input.Read(buffer, size * nmemb); } - const size_t SizeLimit; const TString Data; TString Buffer, Header; TStringInput Input; @@ -331,9 +331,8 @@ public: IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, sizeLimit, 0ULL, std::move(config)) , OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) - , Offset(offset), SizeLimit(sizeLimit) {} static TPtr Make( @@ -368,7 +367,7 @@ public: } if (Working != Counter->load() < buffersSize) { - if (!Working && SizeLimit && Offset + Position >= SizeLimit) { + if (const auto limit = GetSizeLimit(); !Working && limit && Position >= limit) { OnFinish(TIssues()); return EAction::Drop; } @@ -424,8 +423,6 @@ private: size_t Position = 0ULL; bool Cancelled = false; bool StreamStarted = false; - size_t Offset = 0; - size_t SizeLimit = 0; }; using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; @@ -551,7 +548,6 @@ private: #ifdef PROFILE_MEMORY_ALLOCATIONS NProfiling::SetThreadAllocTag(NProfiling::MakeTag("HTTP_PERFORM")); #endif - OutputSize.store(0ULL); for (size_t handlers = 0U; !IsStopped;) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 92e22875043..dcc55359817 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -272,15 +272,13 @@ struct TRetryStuff { TString url, const IHTTPGateway::THeaders& headers, std::size_t sizeLimit - ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), SizeLimit(sizeLimit), Offset(0U), RetryState(GetHTTPDefaultRetryPolicy()->CreateRetryState()) + ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), Offset(0U), SizeLimit(sizeLimit), RetryState(GetHTTPDefaultRetryPolicy()->CreateRetryState()) {} const IHTTPGateway::TPtr Gateway; const TString Url; const IHTTPGateway::THeaders Headers; - const std::size_t SizeLimit; - - std::size_t Offset = 0U; + std::size_t Offset, SizeLimit; const IRetryPolicy<long>::IRetryState::TPtr RetryState; IHTTPGateway::TCancelHook CancelHook; TMaybe<TDuration> NextRetryDelay; @@ -344,6 +342,7 @@ public: if (200L == HttpResponseCode || 206L == HttpResponseCode) { value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); RetryStuff->Offset += value.size(); + RetryStuff->SizeLimit -= value.size(); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } else if (HttpResponseCode && !RetryStuff->NextRetryDelay) { if (ErrorText.size() < 256_KB) @@ -485,7 +484,7 @@ private: } } - if (retryStuff->NextRetryDelay && retryStuff->Offset < retryStuff->SizeLimit) + if (retryStuff->NextRetryDelay && retryStuff->SizeLimit > 0ULL) actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); else actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished)); |