diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-05-26 19:20:25 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-05-26 19:20:25 +0300 |
commit | 8f21373b1e04fcabcebf5ae3f4f31ee3c72b439b (patch) | |
tree | fde8bd288e6fa4eec604a5358c742bc668fb13a8 | |
parent | d185b7a14071428926aa92055e36b5ea2331b3e3 (diff) | |
download | ydb-8f21373b1e04fcabcebf5ae3f4f31ee3c72b439b.tar.gz |
YQ-727 S3 coro read with backpressure.
ref:a4e27ef29e20da0131d9e6638d8ae2dc6832ef6b
4 files changed, 193 insertions, 97 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index c9b16436ab..8895c05587 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -16,10 +16,9 @@ namespace { class TEasyCurl { public: using TPtr = std::shared_ptr<TEasyCurl>; - using TWeakPtr = std::weak_ptr<TEasyCurl>; - TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData) - : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes) + TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, bool withData) + : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes) { curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L); @@ -53,22 +52,19 @@ public: } } - std::size_t GetExpectedSize() const { - return ExpectedSize; - } - CURL* GetHandle() const { return Handle; } - // return true if callback successfully added to this work - virtual bool AddCallback(IHTTPGateway::TOnResult callback) = 0; virtual void Fail(const TIssue& error) = 0; virtual void Done(CURLcode result) = 0; virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0; virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0; - +protected: + void SkipTo(size_t offset) const { + curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset + offset) += '-').c_str()); + } private: static size_t WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) { @@ -82,8 +78,7 @@ private: return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb); }; - const std::size_t Offset; - const std::size_t ExpectedSize; + const size_t Offset; CURL *const Handle; curl_slist* Headers = nullptr; const NMonitoring::TDynamicCounters::TCounterPtr Counter; @@ -92,25 +87,33 @@ private: class TEasyCurlBuffer : public TEasyCurl { public: - TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) - : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer) + using TPtr = std::shared_ptr<TEasyCurlBuffer>; + using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; + + TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) + : TEasyCurl(counter, downloadedBytes, url, headers, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer) { - Output.Reserve(expectedSize); + Output.Reserve(ExpectedSize); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) { return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); } -private: - bool AddCallback(IHTTPGateway::TOnResult callback) final { + + size_t GetExpectedSize() const { + return ExpectedSize; + } + + // return true if callback successfully added to this work + bool AddCallback(IHTTPGateway::TOnResult callback) { const std::unique_lock lock(SyncCallbacks); if (Callbacks.empty()) return false; Callbacks.emplace(std::move(callback)); return true; } - +private: void Fail(const TIssue& error) final { TIssues issues{error}; const std::unique_lock lock(SyncCallbacks); @@ -147,6 +150,7 @@ private: return Input.Read(buffer, size * nmemb); }; + const size_t ExpectedSize; const TString Data; TString Buffer; TStringInput Input; @@ -158,18 +162,35 @@ private: class TEasyCurlStream : public TEasyCurl { public: - TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) - : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)) - { + using TPtr = std::shared_ptr<TEasyCurlStream>; + using TWeakPtr = std::weak_ptr<TEasyCurlStream>; + + TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) + : TEasyCurl(counter, downloadedBytes, url, headers, offset, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + {} + + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { - return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish)); + enum class EAction : i8 { + Stop = -1, + None = 0, + Work = 1 + }; + + EAction GetAction(size_t buffersSize) { + if (Working != Counter->load() < buffersSize) { + if (Working = !Working) + SkipTo(Position); + return Working ? EAction::Work : EAction::Stop; + } + + return EAction::None; } private: - bool AddCallback(IHTTPGateway::TOnResult) final { return false; } - void Fail(const TIssue& error) final { + Working = false; return OnFinish(TIssues{error}); } @@ -177,12 +198,14 @@ private: if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); + Working = false; return OnFinish(std::nullopt); } - size_t Write(void* contents, size_t size, size_t nmemb) final { + size_t Write(void* contents, size_t size, size_t nmemb) final { const auto realsize = size * nmemb; - OnNewData(IHTTPGateway::TContent(TString(static_cast<char*>(contents), realsize))); + Position += realsize; + OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter)); return realsize; }; @@ -190,9 +213,12 @@ private: const IHTTPGateway::TOnNewDataPart OnNewData; const IHTTPGateway::TOnDownloadFinish OnFinish; + const std::shared_ptr<std::atomic_size_t> Counter; + bool Working = false; + size_t Position = 0ULL; }; -using TKeyType = std::tuple<TString, std::size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; +using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; class TKeyHash { public: @@ -222,7 +248,7 @@ public: : Counters(std::move(counters)) , Rps(Counters->GetCounter("Requests", true)) , InFlight(Counters->GetCounter("InFlight")) - , StraightInFlight(Counters->GetCounter("StraightInFlight")) + , InFlightStreams(Counters->GetCounter("InFlightStreams")) , MaxInFlight(Counters->GetCounter("MaxInFlight")) , AllocatedMemory(Counters->GetCounter("AllocatedMemory")) , MaxAllocatedMemory(Counters->GetCounter("MaxAllocatedMemory")) @@ -242,6 +268,10 @@ public: MaxSimulatenousDownloadsSize = httpGatewaysCfg->GetMaxSimulatenousDownloadsSize(); } MaxAllocatedMemory->Set(MaxSimulatenousDownloadsSize); + + if (httpGatewaysCfg->HasBuffersSizePerStream()) { + BuffersSizePerStream = httpGatewaysCfg->GetBuffersSizePerStream(); + } } TaskScheduler.Start(); @@ -253,8 +283,9 @@ public: Thread.join(); } private: - std::size_t MaxHandlers = 1024U; - std::size_t MaxSimulatenousDownloadsSize = 8_GB; + size_t MaxHandlers = 1024U; + size_t MaxSimulatenousDownloadsSize = 8_GB; + size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 1U; static void Perform(const TWeakPtr& weak) { OutputSize.store(0ULL); @@ -310,7 +341,25 @@ private: size_t FillHandlers() { const std::unique_lock lock(Sync); - const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize(); + + for (auto it = Streams.cbegin(); Streams.cend() != it;) { + if (const auto& stream = it->lock()) { + switch (stream->GetAction(BuffersSizePerStream)) { + case TEasyCurlStream::EAction::Work: + curl_multi_add_handle(Handle, stream->GetHandle()); + break; + case TEasyCurlStream::EAction::Stop: + curl_multi_remove_handle(Handle, stream->GetHandle()); + break; + case TEasyCurlStream::EAction::None: + break; + } + ++it; + } else + it = Streams.erase(it); + } + + const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize(); AwaitQueueTopExpectedSize->Set(topExpectedSize); while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetExpectedSize() <= MaxSimulatenousDownloadsSize) { AllocatedSize += Await.front()->GetExpectedSize(); @@ -334,14 +383,16 @@ private: easy = std::move(it->second); curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); - if (const auto stateIt = Easy2RetryState.find(easy); stateIt != Easy2RetryState.end()) { - if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) { - Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay)); - } else { - Easy2RetryState.erase(stateIt); + if (const auto& buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) { + if (const auto stateIt = Easy2RetryState.find(buffer); stateIt != Easy2RetryState.cend()) { + if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) { + Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(buffer, Singleton), *nextRetryDelay)); + } else { + Easy2RetryState.erase(stateIt); + } } + AllocatedSize -= buffer->GetExpectedSize(); } - AllocatedSize -= easy->GetExpectedSize(); Allocated.erase(it); } @@ -360,7 +411,6 @@ private: for (const auto& item : Allocated) { works.emplace(std::move(item.second)); - AllocatedSize -= works.top()->GetExpectedSize(); } AllocatedSize = 0ULL; @@ -379,7 +429,7 @@ private: void Download( TString url, THeaders headers, - std::size_t expectedSize, + size_t expectedSize, TOnResult callback, TString data, IRetryPolicy<long>::TPtr retryPolicy) final @@ -393,7 +443,6 @@ private: } const std::unique_lock lock(Sync); auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; - StraightInFlight->Set(Requests.size()); if (const auto& easy = entry.lock()) if (easy->AddCallback(callback)) return; @@ -408,25 +457,27 @@ private: void Download( TString url, THeaders headers, - std::size_t offset, + size_t offset, TOnNewDataPart onNewData, TOnDownloadFinish onFinish) final { - constexpr auto buffersSize = CURL_MAX_WRITE_SIZE << 4U; - auto easy = TEasyCurlStream::Make(InFlight, DownloadedBytes, std::move(url), std::move(headers), offset, buffersSize, std::move(onNewData), std::move(onFinish)); + + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); - Await.emplace(std::move(easy)); - Wakeup(buffersSize); + const auto handle = stream->GetHandle(); + Streams.emplace_back(stream); + Allocated.emplace(handle, std::move(stream)); + Wakeup(0ULL); } - void OnRetry(TEasyCurl::TPtr easy) { + void OnRetry(TEasyCurlBuffer::TPtr easy) { const std::unique_lock lock(Sync); - const std::size_t expectedSize = easy->GetExpectedSize(); + const size_t expectedSize = easy->GetExpectedSize(); Await.emplace(std::move(easy)); Wakeup(expectedSize); } - void Wakeup(std::size_t expectedSize) { + void Wakeup(size_t expectedSize) { AwaitQueue->Set(Await.size()); if (Allocated.size() < MaxHandlers && AllocatedSize + expectedSize + OutputSize.load() <= MaxSimulatenousDownloadsSize) { curl_multi_wakeup(Handle); @@ -436,7 +487,7 @@ private: class THttpGatewayTask: public TTaskScheduler::IRepeatedTask { public: THttpGatewayTask( - TEasyCurl::TPtr easy, + TEasyCurlBuffer::TPtr easy, THTTPMultiGateway::TWeakPtr gateway) : Easy(easy) , Gateway(gateway) @@ -450,23 +501,24 @@ private: return false; } private: - TEasyCurl::TPtr Easy; + TEasyCurlBuffer::TPtr Easy; THTTPMultiGateway::TWeakPtr Gateway; }; private: CURLM* Handle = nullptr; - std::queue<TEasyCurl::TPtr> Await; + std::queue<TEasyCurlBuffer::TPtr> Await; + std::vector<TEasyCurlStream::TWeakPtr> Streams; std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated; - std::unordered_map<TKeyType, TEasyCurl::TWeakPtr, TKeyHash> Requests; - std::unordered_map<TEasyCurl::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState; + std::unordered_map<TKeyType, TEasyCurlBuffer::TWeakPtr, TKeyHash> Requests; + std::unordered_map<TEasyCurlBuffer::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState; std::mutex Sync; std::thread Thread; - std::size_t AllocatedSize = 0ULL; + size_t AllocatedSize = 0ULL; static std::atomic_size_t OutputSize; static std::mutex CreateSync; @@ -475,7 +527,7 @@ private: const NMonitoring::TDynamicCounterPtr Counters; const NMonitoring::TDynamicCounters::TCounterPtr Rps; const NMonitoring::TDynamicCounters::TCounterPtr InFlight; - const NMonitoring::TDynamicCounters::TCounterPtr StraightInFlight; // doesn't consider merged requests which use one curl + const NMonitoring::TDynamicCounters::TCounterPtr InFlightStreams; const NMonitoring::TDynamicCounters::TCounterPtr MaxInFlight; const NMonitoring::TDynamicCounters::TCounterPtr AllocatedMemory; const NMonitoring::TDynamicCounters::TCounterPtr MaxAllocatedMemory; @@ -494,44 +546,61 @@ THTTPMultiGateway::TWeakPtr THTTPMultiGateway::Singleton; } -IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode) +IHTTPGateway::TContentBase::TContentBase(TString&& data) : TString(std::move(data)) - , HttpResponseCode(httpResponseCode) { if (!empty()) { THTTPMultiGateway::OutputSize.fetch_add(size()); } } -IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode) +IHTTPGateway::TContentBase::TContentBase(const TString& data) : TString(data) - , HttpResponseCode(httpResponseCode) { if (!empty()) { THTTPMultiGateway::OutputSize.fetch_add(size()); } } -IHTTPGateway::TContent::TContent(TString&& data) - : TContent(std::move(data), 0) -{} - -IHTTPGateway::TContent::TContent(const TString& data) - : TContent(data, 0) -{} - -TString IHTTPGateway::TContent::Extract() { +IHTTPGateway::TContentBase::~TContentBase() +{ if (!empty()) { THTTPMultiGateway::OutputSize.fetch_sub(size()); } - return std::move(*this); } -IHTTPGateway::TContent::~TContent() -{ +TString IHTTPGateway::TContentBase::Extract() { if (!empty()) { THTTPMultiGateway::OutputSize.fetch_sub(size()); } + return std::move(*this); +} + +IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode) + : TContentBase(std::move(data)) + , HttpResponseCode(httpResponseCode) +{} + +IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode) + : TContentBase(data) + , HttpResponseCode(httpResponseCode) +{} + + +IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter) + : TContentBase(std::move(data)), Counter(counter) +{ + Counter->fetch_add(size()); +} + +IHTTPGateway::TCountedContent::~TCountedContent() +{ + Counter->fetch_sub(size()); +} + +TString IHTTPGateway::TCountedContent::Extract() { + Counter->fetch_sub(size()); + return TContentBase::Extract(); } IHTTPGateway::TPtr @@ -546,5 +615,4 @@ IHTTPGateway::Make(const THttpGatewayConfig* httpGatewaysCfg, NMonitoring::TDyna return gateway; } - } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 7b745323b5..d5979bcce1 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -8,6 +8,7 @@ #include <library/cpp/retry/retry_policy.h> #include <library/cpp/threading/task_scheduler/task_scheduler.h> +#include <atomic> #include <variant> #include <functional> @@ -24,28 +25,36 @@ public: const THttpGatewayConfig* httpGatewaysCfg = nullptr, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); - class TContent : private TString { - friend class TEasyCurl; - public: - TContent(TString&& data, long httpResponseCode); - TContent(const TString& data, long httpResponseCode); - TContent(TString&& data); - TContent(const TString& data); + class TContentBase : protected TString { + protected: + TContentBase(TString&& data); + TContentBase(const TString& data); - TString Extract(); - ~TContent(); - - TContent(TContent&&) = default; - TContent& operator=(TContent&&) = default; + TContentBase(TContentBase&&) = default; + TContentBase& operator=(TContentBase&&) = default; + ~TContentBase(); + TString Extract(); + public: using TString::size; using TString::data; inline operator std::string_view() const { return { data(), size() }; } private: - TContent(const TContent&) = delete; - TContent& operator=(const TContent&) = delete; + TContentBase(const TContentBase&) = delete; + TContentBase& operator=(const TContentBase&) = delete; + }; + + class TContent : public TContentBase { + friend class TEasyCurl; + public: + TContent(TString&& data, long httpResponseCode = 0LL); + TContent(const TString& data, long httpResponseCode = 0LL); + + TContent(TContent&& src) = default; + TContent& operator=(TContent&& src) = default; + using TContentBase::Extract; public: long HttpResponseCode; }; @@ -63,7 +72,20 @@ public: IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy() ) = 0; - using TOnNewDataPart = std::function<void(TContent&&)>; + class TCountedContent : public TContentBase { + public: + TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter); + ~TCountedContent(); + + TCountedContent(TCountedContent&&) = default; + TCountedContent& operator=(TCountedContent&& src) = default; + + TString Extract(); + private: + const std::shared_ptr<std::atomic_size_t> Counter; + }; + + using TOnNewDataPart = std::function<void(TCountedContent&&)>; using TOnDownloadFinish = std::function<void(std::optional<TIssues>)>; virtual void Download( diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 6155aa7660..2910c9c248 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -55,6 +55,7 @@ enum ETokenType { message THttpGatewayConfig { optional uint32 MaxInFlightCount = 1; optional uint64 MaxSimulatenousDownloadsSize = 2; + optional uint32 BuffersSizePerStream = 3; } /////////////////////////////// YT /////////////////////////////// diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 8c9b01ccf3..3c47e5d963 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -72,6 +72,11 @@ struct TEvPrivate { const size_t PathIndex; }; + struct TEvDataPart : public TEventLocal<TEvDataPart, EvReadResult> { + TEvDataPart(IHTTPGateway::TCountedContent&& data) : Result(std::move(data)) {} + IHTTPGateway::TCountedContent Result; + }; + struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {}; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { @@ -281,7 +286,7 @@ private: class TReadBufferFromStream : public NDB::ReadBuffer { public: TReadBufferFromStream(TS3ReadCoroImpl* coro) - : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro), Value(TString()) + : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro) {} private: bool nextImpl() final { @@ -294,18 +299,18 @@ private: } TS3ReadCoroImpl *const Coro; - IHTTPGateway::TContent Value; + TString Value; }; public: TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& sourceActorId, const NActors::TActorId& computeActorId, const TReadSpec::TPtr& readSpec) : TActorCoroImpl(256_KB), InputIndex(inputIndex), ReadSpec(readSpec), SourceActorId(sourceActorId), ComputeActorId(computeActorId) {} - bool Next(IHTTPGateway::TContent& value) { + bool Next(TString& value) { if (Finished) return false; - const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); + const auto ev = WaitForSpecificEvent<TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); switch (const auto etype = ev->GetTypeRewrite()) { case TEvPrivate::TEvReadFinished::EventType: Finished = true; @@ -313,8 +318,8 @@ public: case TEvPrivate::TEvReadError::EventType: Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); return false; - case TEvPrivate::TEvReadResult::EventType: - value = std::move(ev->Get<TEvPrivate::TEvReadResult>()->Result); + case TEvPrivate::TEvDataPart::EventType: + value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); return true; default: @@ -380,10 +385,10 @@ public: , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize)) {} private: - static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TContent&& data) { + static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TCountedContent&& data) { retryStuff->Offset += data.size(); retryStuff->RetryParams.Reset(); - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadResult(std::move(data)))); + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data)))); } static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::optional<TIssues> result) { |