aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-05-26 19:20:25 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-05-26 19:20:25 +0300
commit8f21373b1e04fcabcebf5ae3f4f31ee3c72b439b (patch)
treefde8bd288e6fa4eec604a5358c742bc668fb13a8
parentd185b7a14071428926aa92055e36b5ea2331b3e3 (diff)
downloadydb-8f21373b1e04fcabcebf5ae3f4f31ee3c72b439b.tar.gz
YQ-727 S3 coro read with backpressure.
ref:a4e27ef29e20da0131d9e6638d8ae2dc6832ef6b
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp216
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h52
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp21
4 files changed, 193 insertions, 97 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index c9b16436ab..8895c05587 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -16,10 +16,9 @@ namespace {
class TEasyCurl {
public:
using TPtr = std::shared_ptr<TEasyCurl>;
- using TWeakPtr = std::weak_ptr<TEasyCurl>;
- TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData)
- : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes)
+ TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, bool withData)
+ : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes)
{
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L);
@@ -53,22 +52,19 @@ public:
}
}
- std::size_t GetExpectedSize() const {
- return ExpectedSize;
- }
-
CURL* GetHandle() const {
return Handle;
}
- // return true if callback successfully added to this work
- virtual bool AddCallback(IHTTPGateway::TOnResult callback) = 0;
virtual void Fail(const TIssue& error) = 0;
virtual void Done(CURLcode result) = 0;
virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0;
virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0;
-
+protected:
+ void SkipTo(size_t offset) const {
+ curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset + offset) += '-').c_str());
+ }
private:
static size_t
WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) {
@@ -82,8 +78,7 @@ private:
return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb);
};
- const std::size_t Offset;
- const std::size_t ExpectedSize;
+ const size_t Offset;
CURL *const Handle;
curl_slist* Headers = nullptr;
const NMonitoring::TDynamicCounters::TCounterPtr Counter;
@@ -92,25 +87,33 @@ private:
class TEasyCurlBuffer : public TEasyCurl {
public:
- TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback)
- : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer)
+ using TPtr = std::shared_ptr<TEasyCurlBuffer>;
+ using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
+
+ TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback)
+ : TEasyCurl(counter, downloadedBytes, url, headers, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer)
{
- Output.Reserve(expectedSize);
+ Output.Reserve(ExpectedSize);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) {
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) {
return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
}
-private:
- bool AddCallback(IHTTPGateway::TOnResult callback) final {
+
+ size_t GetExpectedSize() const {
+ return ExpectedSize;
+ }
+
+ // return true if callback successfully added to this work
+ bool AddCallback(IHTTPGateway::TOnResult callback) {
const std::unique_lock lock(SyncCallbacks);
if (Callbacks.empty())
return false;
Callbacks.emplace(std::move(callback));
return true;
}
-
+private:
void Fail(const TIssue& error) final {
TIssues issues{error};
const std::unique_lock lock(SyncCallbacks);
@@ -147,6 +150,7 @@ private:
return Input.Read(buffer, size * nmemb);
};
+ const size_t ExpectedSize;
const TString Data;
TString Buffer;
TStringInput Input;
@@ -158,18 +162,35 @@ private:
class TEasyCurlStream : public TEasyCurl {
public:
- TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
- : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish))
- {
+ using TPtr = std::shared_ptr<TEasyCurlStream>;
+ using TWeakPtr = std::weak_ptr<TEasyCurlStream>;
+
+ TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
+ : TEasyCurl(counter, downloadedBytes, url, headers, offset, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ {}
+
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
+ return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
- return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish));
+ enum class EAction : i8 {
+ Stop = -1,
+ None = 0,
+ Work = 1
+ };
+
+ EAction GetAction(size_t buffersSize) {
+ if (Working != Counter->load() < buffersSize) {
+ if (Working = !Working)
+ SkipTo(Position);
+ return Working ? EAction::Work : EAction::Stop;
+ }
+
+ return EAction::None;
}
private:
- bool AddCallback(IHTTPGateway::TOnResult) final { return false; }
-
void Fail(const TIssue& error) final {
+ Working = false;
return OnFinish(TIssues{error});
}
@@ -177,12 +198,14 @@ private:
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
+ Working = false;
return OnFinish(std::nullopt);
}
- size_t Write(void* contents, size_t size, size_t nmemb) final {
+ size_t Write(void* contents, size_t size, size_t nmemb) final {
const auto realsize = size * nmemb;
- OnNewData(IHTTPGateway::TContent(TString(static_cast<char*>(contents), realsize)));
+ Position += realsize;
+ OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter));
return realsize;
};
@@ -190,9 +213,12 @@ private:
const IHTTPGateway::TOnNewDataPart OnNewData;
const IHTTPGateway::TOnDownloadFinish OnFinish;
+ const std::shared_ptr<std::atomic_size_t> Counter;
+ bool Working = false;
+ size_t Position = 0ULL;
};
-using TKeyType = std::tuple<TString, std::size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
+using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
class TKeyHash {
public:
@@ -222,7 +248,7 @@ public:
: Counters(std::move(counters))
, Rps(Counters->GetCounter("Requests", true))
, InFlight(Counters->GetCounter("InFlight"))
- , StraightInFlight(Counters->GetCounter("StraightInFlight"))
+ , InFlightStreams(Counters->GetCounter("InFlightStreams"))
, MaxInFlight(Counters->GetCounter("MaxInFlight"))
, AllocatedMemory(Counters->GetCounter("AllocatedMemory"))
, MaxAllocatedMemory(Counters->GetCounter("MaxAllocatedMemory"))
@@ -242,6 +268,10 @@ public:
MaxSimulatenousDownloadsSize = httpGatewaysCfg->GetMaxSimulatenousDownloadsSize();
}
MaxAllocatedMemory->Set(MaxSimulatenousDownloadsSize);
+
+ if (httpGatewaysCfg->HasBuffersSizePerStream()) {
+ BuffersSizePerStream = httpGatewaysCfg->GetBuffersSizePerStream();
+ }
}
TaskScheduler.Start();
@@ -253,8 +283,9 @@ public:
Thread.join();
}
private:
- std::size_t MaxHandlers = 1024U;
- std::size_t MaxSimulatenousDownloadsSize = 8_GB;
+ size_t MaxHandlers = 1024U;
+ size_t MaxSimulatenousDownloadsSize = 8_GB;
+ size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 1U;
static void Perform(const TWeakPtr& weak) {
OutputSize.store(0ULL);
@@ -310,7 +341,25 @@ private:
size_t FillHandlers() {
const std::unique_lock lock(Sync);
- const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize();
+
+ for (auto it = Streams.cbegin(); Streams.cend() != it;) {
+ if (const auto& stream = it->lock()) {
+ switch (stream->GetAction(BuffersSizePerStream)) {
+ case TEasyCurlStream::EAction::Work:
+ curl_multi_add_handle(Handle, stream->GetHandle());
+ break;
+ case TEasyCurlStream::EAction::Stop:
+ curl_multi_remove_handle(Handle, stream->GetHandle());
+ break;
+ case TEasyCurlStream::EAction::None:
+ break;
+ }
+ ++it;
+ } else
+ it = Streams.erase(it);
+ }
+
+ const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize();
AwaitQueueTopExpectedSize->Set(topExpectedSize);
while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetExpectedSize() <= MaxSimulatenousDownloadsSize) {
AllocatedSize += Await.front()->GetExpectedSize();
@@ -334,14 +383,16 @@ private:
easy = std::move(it->second);
curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
- if (const auto stateIt = Easy2RetryState.find(easy); stateIt != Easy2RetryState.end()) {
- if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) {
- Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay));
- } else {
- Easy2RetryState.erase(stateIt);
+ if (const auto& buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) {
+ if (const auto stateIt = Easy2RetryState.find(buffer); stateIt != Easy2RetryState.cend()) {
+ if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) {
+ Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(buffer, Singleton), *nextRetryDelay));
+ } else {
+ Easy2RetryState.erase(stateIt);
+ }
}
+ AllocatedSize -= buffer->GetExpectedSize();
}
- AllocatedSize -= easy->GetExpectedSize();
Allocated.erase(it);
}
@@ -360,7 +411,6 @@ private:
for (const auto& item : Allocated) {
works.emplace(std::move(item.second));
- AllocatedSize -= works.top()->GetExpectedSize();
}
AllocatedSize = 0ULL;
@@ -379,7 +429,7 @@ private:
void Download(
TString url,
THeaders headers,
- std::size_t expectedSize,
+ size_t expectedSize,
TOnResult callback,
TString data,
IRetryPolicy<long>::TPtr retryPolicy) final
@@ -393,7 +443,6 @@ private:
}
const std::unique_lock lock(Sync);
auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)];
- StraightInFlight->Set(Requests.size());
if (const auto& easy = entry.lock())
if (easy->AddCallback(callback))
return;
@@ -408,25 +457,27 @@ private:
void Download(
TString url,
THeaders headers,
- std::size_t offset,
+ size_t offset,
TOnNewDataPart onNewData,
TOnDownloadFinish onFinish) final
{
- constexpr auto buffersSize = CURL_MAX_WRITE_SIZE << 4U;
- auto easy = TEasyCurlStream::Make(InFlight, DownloadedBytes, std::move(url), std::move(headers), offset, buffersSize, std::move(onNewData), std::move(onFinish));
+
+ auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
- Await.emplace(std::move(easy));
- Wakeup(buffersSize);
+ const auto handle = stream->GetHandle();
+ Streams.emplace_back(stream);
+ Allocated.emplace(handle, std::move(stream));
+ Wakeup(0ULL);
}
- void OnRetry(TEasyCurl::TPtr easy) {
+ void OnRetry(TEasyCurlBuffer::TPtr easy) {
const std::unique_lock lock(Sync);
- const std::size_t expectedSize = easy->GetExpectedSize();
+ const size_t expectedSize = easy->GetExpectedSize();
Await.emplace(std::move(easy));
Wakeup(expectedSize);
}
- void Wakeup(std::size_t expectedSize) {
+ void Wakeup(size_t expectedSize) {
AwaitQueue->Set(Await.size());
if (Allocated.size() < MaxHandlers && AllocatedSize + expectedSize + OutputSize.load() <= MaxSimulatenousDownloadsSize) {
curl_multi_wakeup(Handle);
@@ -436,7 +487,7 @@ private:
class THttpGatewayTask: public TTaskScheduler::IRepeatedTask {
public:
THttpGatewayTask(
- TEasyCurl::TPtr easy,
+ TEasyCurlBuffer::TPtr easy,
THTTPMultiGateway::TWeakPtr gateway)
: Easy(easy)
, Gateway(gateway)
@@ -450,23 +501,24 @@ private:
return false;
}
private:
- TEasyCurl::TPtr Easy;
+ TEasyCurlBuffer::TPtr Easy;
THTTPMultiGateway::TWeakPtr Gateway;
};
private:
CURLM* Handle = nullptr;
- std::queue<TEasyCurl::TPtr> Await;
+ std::queue<TEasyCurlBuffer::TPtr> Await;
+ std::vector<TEasyCurlStream::TWeakPtr> Streams;
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
- std::unordered_map<TKeyType, TEasyCurl::TWeakPtr, TKeyHash> Requests;
- std::unordered_map<TEasyCurl::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState;
+ std::unordered_map<TKeyType, TEasyCurlBuffer::TWeakPtr, TKeyHash> Requests;
+ std::unordered_map<TEasyCurlBuffer::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState;
std::mutex Sync;
std::thread Thread;
- std::size_t AllocatedSize = 0ULL;
+ size_t AllocatedSize = 0ULL;
static std::atomic_size_t OutputSize;
static std::mutex CreateSync;
@@ -475,7 +527,7 @@ private:
const NMonitoring::TDynamicCounterPtr Counters;
const NMonitoring::TDynamicCounters::TCounterPtr Rps;
const NMonitoring::TDynamicCounters::TCounterPtr InFlight;
- const NMonitoring::TDynamicCounters::TCounterPtr StraightInFlight; // doesn't consider merged requests which use one curl
+ const NMonitoring::TDynamicCounters::TCounterPtr InFlightStreams;
const NMonitoring::TDynamicCounters::TCounterPtr MaxInFlight;
const NMonitoring::TDynamicCounters::TCounterPtr AllocatedMemory;
const NMonitoring::TDynamicCounters::TCounterPtr MaxAllocatedMemory;
@@ -494,44 +546,61 @@ THTTPMultiGateway::TWeakPtr THTTPMultiGateway::Singleton;
}
-IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode)
+IHTTPGateway::TContentBase::TContentBase(TString&& data)
: TString(std::move(data))
- , HttpResponseCode(httpResponseCode)
{
if (!empty()) {
THTTPMultiGateway::OutputSize.fetch_add(size());
}
}
-IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode)
+IHTTPGateway::TContentBase::TContentBase(const TString& data)
: TString(data)
- , HttpResponseCode(httpResponseCode)
{
if (!empty()) {
THTTPMultiGateway::OutputSize.fetch_add(size());
}
}
-IHTTPGateway::TContent::TContent(TString&& data)
- : TContent(std::move(data), 0)
-{}
-
-IHTTPGateway::TContent::TContent(const TString& data)
- : TContent(data, 0)
-{}
-
-TString IHTTPGateway::TContent::Extract() {
+IHTTPGateway::TContentBase::~TContentBase()
+{
if (!empty()) {
THTTPMultiGateway::OutputSize.fetch_sub(size());
}
- return std::move(*this);
}
-IHTTPGateway::TContent::~TContent()
-{
+TString IHTTPGateway::TContentBase::Extract() {
if (!empty()) {
THTTPMultiGateway::OutputSize.fetch_sub(size());
}
+ return std::move(*this);
+}
+
+IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode)
+ : TContentBase(std::move(data))
+ , HttpResponseCode(httpResponseCode)
+{}
+
+IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode)
+ : TContentBase(data)
+ , HttpResponseCode(httpResponseCode)
+{}
+
+
+IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter)
+ : TContentBase(std::move(data)), Counter(counter)
+{
+ Counter->fetch_add(size());
+}
+
+IHTTPGateway::TCountedContent::~TCountedContent()
+{
+ Counter->fetch_sub(size());
+}
+
+TString IHTTPGateway::TCountedContent::Extract() {
+ Counter->fetch_sub(size());
+ return TContentBase::Extract();
}
IHTTPGateway::TPtr
@@ -546,5 +615,4 @@ IHTTPGateway::Make(const THttpGatewayConfig* httpGatewaysCfg, NMonitoring::TDyna
return gateway;
}
-
}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 7b745323b5..d5979bcce1 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -8,6 +8,7 @@
#include <library/cpp/retry/retry_policy.h>
#include <library/cpp/threading/task_scheduler/task_scheduler.h>
+#include <atomic>
#include <variant>
#include <functional>
@@ -24,28 +25,36 @@ public:
const THttpGatewayConfig* httpGatewaysCfg = nullptr,
NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
- class TContent : private TString {
- friend class TEasyCurl;
- public:
- TContent(TString&& data, long httpResponseCode);
- TContent(const TString& data, long httpResponseCode);
- TContent(TString&& data);
- TContent(const TString& data);
+ class TContentBase : protected TString {
+ protected:
+ TContentBase(TString&& data);
+ TContentBase(const TString& data);
- TString Extract();
- ~TContent();
-
- TContent(TContent&&) = default;
- TContent& operator=(TContent&&) = default;
+ TContentBase(TContentBase&&) = default;
+ TContentBase& operator=(TContentBase&&) = default;
+ ~TContentBase();
+ TString Extract();
+ public:
using TString::size;
using TString::data;
inline operator std::string_view() const { return { data(), size() }; }
private:
- TContent(const TContent&) = delete;
- TContent& operator=(const TContent&) = delete;
+ TContentBase(const TContentBase&) = delete;
+ TContentBase& operator=(const TContentBase&) = delete;
+ };
+
+ class TContent : public TContentBase {
+ friend class TEasyCurl;
+ public:
+ TContent(TString&& data, long httpResponseCode = 0LL);
+ TContent(const TString& data, long httpResponseCode = 0LL);
+
+ TContent(TContent&& src) = default;
+ TContent& operator=(TContent&& src) = default;
+ using TContentBase::Extract;
public:
long HttpResponseCode;
};
@@ -63,7 +72,20 @@ public:
IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()
) = 0;
- using TOnNewDataPart = std::function<void(TContent&&)>;
+ class TCountedContent : public TContentBase {
+ public:
+ TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter);
+ ~TCountedContent();
+
+ TCountedContent(TCountedContent&&) = default;
+ TCountedContent& operator=(TCountedContent&& src) = default;
+
+ TString Extract();
+ private:
+ const std::shared_ptr<std::atomic_size_t> Counter;
+ };
+
+ using TOnNewDataPart = std::function<void(TCountedContent&&)>;
using TOnDownloadFinish = std::function<void(std::optional<TIssues>)>;
virtual void Download(
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 6155aa7660..2910c9c248 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -55,6 +55,7 @@ enum ETokenType {
message THttpGatewayConfig {
optional uint32 MaxInFlightCount = 1;
optional uint64 MaxSimulatenousDownloadsSize = 2;
+ optional uint32 BuffersSizePerStream = 3;
}
/////////////////////////////// YT ///////////////////////////////
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 8c9b01ccf3..3c47e5d963 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -72,6 +72,11 @@ struct TEvPrivate {
const size_t PathIndex;
};
+ struct TEvDataPart : public TEventLocal<TEvDataPart, EvReadResult> {
+ TEvDataPart(IHTTPGateway::TCountedContent&& data) : Result(std::move(data)) {}
+ IHTTPGateway::TCountedContent Result;
+ };
+
struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {};
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
@@ -281,7 +286,7 @@ private:
class TReadBufferFromStream : public NDB::ReadBuffer {
public:
TReadBufferFromStream(TS3ReadCoroImpl* coro)
- : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro), Value(TString())
+ : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro)
{}
private:
bool nextImpl() final {
@@ -294,18 +299,18 @@ private:
}
TS3ReadCoroImpl *const Coro;
- IHTTPGateway::TContent Value;
+ TString Value;
};
public:
TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& sourceActorId, const NActors::TActorId& computeActorId, const TReadSpec::TPtr& readSpec)
: TActorCoroImpl(256_KB), InputIndex(inputIndex), ReadSpec(readSpec), SourceActorId(sourceActorId), ComputeActorId(computeActorId)
{}
- bool Next(IHTTPGateway::TContent& value) {
+ bool Next(TString& value) {
if (Finished)
return false;
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
switch (const auto etype = ev->GetTypeRewrite()) {
case TEvPrivate::TEvReadFinished::EventType:
Finished = true;
@@ -313,8 +318,8 @@ public:
case TEvPrivate::TEvReadError::EventType:
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
return false;
- case TEvPrivate::TEvReadResult::EventType:
- value = std::move(ev->Get<TEvPrivate::TEvReadResult>()->Result);
+ case TEvPrivate::TEvDataPart::EventType:
+ value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
return true;
default:
@@ -380,10 +385,10 @@ public:
, RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize))
{}
private:
- static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TContent&& data) {
+ static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TCountedContent&& data) {
retryStuff->Offset += data.size();
retryStuff->RetryParams.Reset();
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadResult(std::move(data))));
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));
}
static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::optional<TIssues> result) {