diff options
| author | a-romanov <[email protected]> | 2022-05-26 19:20:25 +0300 | 
|---|---|---|
| committer | a-romanov <[email protected]> | 2022-05-26 19:20:25 +0300 | 
| commit | 8f21373b1e04fcabcebf5ae3f4f31ee3c72b439b (patch) | |
| tree | fde8bd288e6fa4eec604a5358c742bc668fb13a8 | |
| parent | d185b7a14071428926aa92055e36b5ea2331b3e3 (diff) | |
YQ-727 S3 coro read with backpressure.
ref:a4e27ef29e20da0131d9e6638d8ae2dc6832ef6b
4 files changed, 193 insertions, 97 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index c9b16436abe..8895c05587f 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -16,10 +16,9 @@ namespace {  class TEasyCurl {  public:      using TPtr = std::shared_ptr<TEasyCurl>; -    using TWeakPtr = std::weak_ptr<TEasyCurl>; -    TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData) -        : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes) +    TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, bool withData) +        : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes)      {          curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());          curl_easy_setopt(Handle, CURLOPT_POST, withData ?  1L : 0L); @@ -53,22 +52,19 @@ public:          }      } -    std::size_t GetExpectedSize() const { -        return ExpectedSize; -    } -      CURL* GetHandle() const {          return Handle;      } -    // return true if callback successfully added to this work -    virtual bool AddCallback(IHTTPGateway::TOnResult callback) = 0;      virtual void Fail(const TIssue& error) = 0;      virtual void Done(CURLcode result) = 0;      virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0;      virtual size_t Read(char *buffer, size_t size, size_t nmemb) = 0; - +protected: +    void SkipTo(size_t offset) const { +        curl_easy_setopt(Handle, CURLOPT_RANGE,  (ToString(Offset + offset) += '-').c_str()); +    }  private:      static size_t      WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) { @@ -82,8 +78,7 @@ private:          return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb);      }; -    const std::size_t Offset; -    const std::size_t ExpectedSize; +    const size_t Offset;      CURL *const Handle;      curl_slist* Headers = nullptr;      const NMonitoring::TDynamicCounters::TCounterPtr Counter; @@ -92,25 +87,33 @@ private:  class TEasyCurlBuffer : public TEasyCurl {  public: -    TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) -        : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer) +    using TPtr = std::shared_ptr<TEasyCurlBuffer>; +    using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; + +    TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) +        : TEasyCurl(counter, downloadedBytes, url, headers, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer)      { -        Output.Reserve(expectedSize); +        Output.Reserve(ExpectedSize);          Callbacks.emplace(std::move(callback));      } -    static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { +    static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) {          return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback));      } -private: -    bool AddCallback(IHTTPGateway::TOnResult callback) final { + +    size_t GetExpectedSize() const { +        return ExpectedSize; +    } + +    // return true if callback successfully added to this work +    bool AddCallback(IHTTPGateway::TOnResult callback) {          const std::unique_lock lock(SyncCallbacks);          if (Callbacks.empty())              return false;          Callbacks.emplace(std::move(callback));          return true;      } - +private:      void Fail(const TIssue& error) final  {          TIssues issues{error};          const std::unique_lock lock(SyncCallbacks); @@ -147,6 +150,7 @@ private:          return Input.Read(buffer, size * nmemb);      }; +    const size_t ExpectedSize;      const TString Data;      TString Buffer;      TStringInput Input; @@ -158,18 +162,35 @@ private:  class TEasyCurlStream : public TEasyCurl {  public: -    TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) -        : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)) -    { +    using TPtr = std::shared_ptr<TEasyCurlStream>; +    using TWeakPtr = std::weak_ptr<TEasyCurlStream>; + +    TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) +        : TEasyCurl(counter, downloadedBytes, url, headers, offset, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) +    {} + +    static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { +        return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));      } -    static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr&  counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { -        return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish)); +    enum class EAction : i8 { +        Stop = -1, +        None = 0, +        Work = 1 +    }; + +    EAction GetAction(size_t buffersSize) { +        if (Working != Counter->load() < buffersSize) { +            if (Working = !Working) +                SkipTo(Position); +            return Working ? EAction::Work : EAction::Stop; +        } + +        return EAction::None;      }  private: -    bool AddCallback(IHTTPGateway::TOnResult) final { return false; } -      void Fail(const TIssue& error) final  { +        Working = false;          return OnFinish(TIssues{error});      } @@ -177,12 +198,14 @@ private:          if (CURLE_OK != result)              return Fail(TIssue(curl_easy_strerror(result))); +        Working = false;          return OnFinish(std::nullopt);      } -    size_t  Write(void* contents, size_t size, size_t nmemb) final { +    size_t Write(void* contents, size_t size, size_t nmemb) final {          const auto realsize = size * nmemb; -        OnNewData(IHTTPGateway::TContent(TString(static_cast<char*>(contents), realsize))); +        Position += realsize; +        OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter));          return realsize;      }; @@ -190,9 +213,12 @@ private:      const IHTTPGateway::TOnNewDataPart OnNewData;      const IHTTPGateway::TOnDownloadFinish OnFinish; +    const std::shared_ptr<std::atomic_size_t> Counter; +    bool Working = false; +    size_t Position = 0ULL;  }; -using TKeyType = std::tuple<TString, std::size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; +using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;  class TKeyHash {  public: @@ -222,7 +248,7 @@ public:          : Counters(std::move(counters))          , Rps(Counters->GetCounter("Requests", true))          , InFlight(Counters->GetCounter("InFlight")) -        , StraightInFlight(Counters->GetCounter("StraightInFlight")) +        , InFlightStreams(Counters->GetCounter("InFlightStreams"))          , MaxInFlight(Counters->GetCounter("MaxInFlight"))          , AllocatedMemory(Counters->GetCounter("AllocatedMemory"))          , MaxAllocatedMemory(Counters->GetCounter("MaxAllocatedMemory")) @@ -242,6 +268,10 @@ public:                  MaxSimulatenousDownloadsSize = httpGatewaysCfg->GetMaxSimulatenousDownloadsSize();              }              MaxAllocatedMemory->Set(MaxSimulatenousDownloadsSize); + +            if (httpGatewaysCfg->HasBuffersSizePerStream()) { +                BuffersSizePerStream = httpGatewaysCfg->GetBuffersSizePerStream(); +            }          }          TaskScheduler.Start(); @@ -253,8 +283,9 @@ public:          Thread.join();      }  private: -    std::size_t MaxHandlers = 1024U; -    std::size_t MaxSimulatenousDownloadsSize = 8_GB; +    size_t MaxHandlers = 1024U; +    size_t MaxSimulatenousDownloadsSize = 8_GB; +    size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 1U;      static void Perform(const TWeakPtr& weak) {          OutputSize.store(0ULL); @@ -310,7 +341,25 @@ private:      size_t FillHandlers() {          const std::unique_lock lock(Sync); -        const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize();  + +        for (auto it = Streams.cbegin(); Streams.cend() != it;) { +            if (const auto& stream = it->lock()) { +                switch (stream->GetAction(BuffersSizePerStream)) { +                    case TEasyCurlStream::EAction::Work: +                        curl_multi_add_handle(Handle, stream->GetHandle()); +                        break; +                    case TEasyCurlStream::EAction::Stop: +                        curl_multi_remove_handle(Handle, stream->GetHandle()); +                        break; +                    case TEasyCurlStream::EAction::None: +                        break; +                } +                ++it; +            } else +                it = Streams.erase(it); +        } + +        const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize();          AwaitQueueTopExpectedSize->Set(topExpectedSize);          while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetExpectedSize() <= MaxSimulatenousDownloadsSize) {              AllocatedSize += Await.front()->GetExpectedSize(); @@ -334,14 +383,16 @@ private:                  easy = std::move(it->second);                  curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); -                if (const auto stateIt = Easy2RetryState.find(easy); stateIt != Easy2RetryState.end()) { -                    if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) { -                        Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay)); -                    } else { -                        Easy2RetryState.erase(stateIt); +                if (const auto& buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) { +                    if (const auto stateIt = Easy2RetryState.find(buffer); stateIt != Easy2RetryState.cend()) { +                        if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) { +                            Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(buffer, Singleton), *nextRetryDelay)); +                        } else { +                            Easy2RetryState.erase(stateIt); +                        }                      } +                    AllocatedSize -= buffer->GetExpectedSize();                  } -                AllocatedSize -= easy->GetExpectedSize();                  Allocated.erase(it);              } @@ -360,7 +411,6 @@ private:              for (const auto& item : Allocated) {                  works.emplace(std::move(item.second)); -                AllocatedSize -= works.top()->GetExpectedSize();              }              AllocatedSize = 0ULL; @@ -379,7 +429,7 @@ private:      void Download(          TString url,          THeaders headers, -        std::size_t expectedSize, +        size_t expectedSize,          TOnResult callback,          TString data,          IRetryPolicy<long>::TPtr retryPolicy) final @@ -393,7 +443,6 @@ private:          }          const std::unique_lock lock(Sync);          auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; -        StraightInFlight->Set(Requests.size());          if (const auto& easy = entry.lock())              if (easy->AddCallback(callback))                  return; @@ -408,25 +457,27 @@ private:      void Download(          TString url,          THeaders headers, -        std::size_t offset, +        size_t offset,          TOnNewDataPart onNewData,          TOnDownloadFinish onFinish) final      { -        constexpr auto buffersSize = CURL_MAX_WRITE_SIZE << 4U; -        auto easy = TEasyCurlStream::Make(InFlight, DownloadedBytes, std::move(url), std::move(headers), offset, buffersSize, std::move(onNewData), std::move(onFinish)); + +        auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));          const std::unique_lock lock(Sync); -        Await.emplace(std::move(easy)); -        Wakeup(buffersSize); +        const auto handle = stream->GetHandle(); +        Streams.emplace_back(stream); +        Allocated.emplace(handle, std::move(stream)); +        Wakeup(0ULL);      } -    void OnRetry(TEasyCurl::TPtr easy) { +    void OnRetry(TEasyCurlBuffer::TPtr easy) {          const std::unique_lock lock(Sync); -        const std::size_t expectedSize = easy->GetExpectedSize(); +        const size_t expectedSize = easy->GetExpectedSize();          Await.emplace(std::move(easy));          Wakeup(expectedSize);      } -    void Wakeup(std::size_t expectedSize) { +    void Wakeup(size_t expectedSize) {          AwaitQueue->Set(Await.size());          if (Allocated.size() < MaxHandlers && AllocatedSize + expectedSize + OutputSize.load() <= MaxSimulatenousDownloadsSize) {              curl_multi_wakeup(Handle); @@ -436,7 +487,7 @@ private:      class THttpGatewayTask: public TTaskScheduler::IRepeatedTask {      public:          THttpGatewayTask( -            TEasyCurl::TPtr easy, +            TEasyCurlBuffer::TPtr easy,              THTTPMultiGateway::TWeakPtr gateway)              : Easy(easy)              , Gateway(gateway) @@ -450,23 +501,24 @@ private:              return false;          }      private: -        TEasyCurl::TPtr Easy; +        TEasyCurlBuffer::TPtr Easy;          THTTPMultiGateway::TWeakPtr Gateway;      };  private:      CURLM* Handle = nullptr; -    std::queue<TEasyCurl::TPtr> Await; +    std::queue<TEasyCurlBuffer::TPtr> Await; +    std::vector<TEasyCurlStream::TWeakPtr> Streams;      std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated; -    std::unordered_map<TKeyType, TEasyCurl::TWeakPtr, TKeyHash> Requests; -    std::unordered_map<TEasyCurl::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState; +    std::unordered_map<TKeyType, TEasyCurlBuffer::TWeakPtr, TKeyHash> Requests; +    std::unordered_map<TEasyCurlBuffer::TPtr, IRetryPolicy<long>::IRetryState::TPtr> Easy2RetryState;      std::mutex Sync;      std::thread Thread; -    std::size_t AllocatedSize = 0ULL; +    size_t AllocatedSize = 0ULL;      static std::atomic_size_t OutputSize;      static std::mutex CreateSync; @@ -475,7 +527,7 @@ private:      const NMonitoring::TDynamicCounterPtr Counters;      const NMonitoring::TDynamicCounters::TCounterPtr Rps;      const NMonitoring::TDynamicCounters::TCounterPtr InFlight; -    const NMonitoring::TDynamicCounters::TCounterPtr StraightInFlight; // doesn't consider merged requests which use one curl +    const NMonitoring::TDynamicCounters::TCounterPtr InFlightStreams;      const NMonitoring::TDynamicCounters::TCounterPtr MaxInFlight;      const NMonitoring::TDynamicCounters::TCounterPtr AllocatedMemory;      const NMonitoring::TDynamicCounters::TCounterPtr MaxAllocatedMemory; @@ -494,44 +546,61 @@ THTTPMultiGateway::TWeakPtr THTTPMultiGateway::Singleton;  } -IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode) +IHTTPGateway::TContentBase::TContentBase(TString&& data)      : TString(std::move(data)) -    , HttpResponseCode(httpResponseCode)  {      if (!empty()) {          THTTPMultiGateway::OutputSize.fetch_add(size());      }  } -IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode) +IHTTPGateway::TContentBase::TContentBase(const TString& data)      : TString(data) -    , HttpResponseCode(httpResponseCode)  {      if (!empty()) {          THTTPMultiGateway::OutputSize.fetch_add(size());      }  } -IHTTPGateway::TContent::TContent(TString&& data) -    : TContent(std::move(data), 0) -{} - -IHTTPGateway::TContent::TContent(const TString& data) -    : TContent(data, 0) -{} - -TString IHTTPGateway::TContent::Extract() { +IHTTPGateway::TContentBase::~TContentBase() +{      if (!empty()) {          THTTPMultiGateway::OutputSize.fetch_sub(size());      } -    return std::move(*this);  } -IHTTPGateway::TContent::~TContent() -{ +TString IHTTPGateway::TContentBase::Extract() {      if (!empty()) {          THTTPMultiGateway::OutputSize.fetch_sub(size());      } +    return std::move(*this); +} + +IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode) +    : TContentBase(std::move(data)) +    , HttpResponseCode(httpResponseCode) +{} + +IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode) +    : TContentBase(data) +    , HttpResponseCode(httpResponseCode) +{} + + +IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter) +    : TContentBase(std::move(data)), Counter(counter) +{ +    Counter->fetch_add(size()); +} + +IHTTPGateway::TCountedContent::~TCountedContent() +{ +    Counter->fetch_sub(size()); +} + +TString IHTTPGateway::TCountedContent::Extract() { +    Counter->fetch_sub(size()); +    return TContentBase::Extract();  }  IHTTPGateway::TPtr @@ -546,5 +615,4 @@ IHTTPGateway::Make(const THttpGatewayConfig* httpGatewaysCfg, NMonitoring::TDyna      return gateway;  } -  } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 7b745323b57..d5979bcce19 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -8,6 +8,7 @@  #include <library/cpp/retry/retry_policy.h>  #include <library/cpp/threading/task_scheduler/task_scheduler.h> +#include <atomic>  #include <variant>  #include <functional> @@ -24,28 +25,36 @@ public:          const THttpGatewayConfig* httpGatewaysCfg = nullptr,          NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); -    class TContent : private TString { -    friend class TEasyCurl; -    public: -        TContent(TString&& data, long httpResponseCode); -        TContent(const TString& data, long httpResponseCode); -        TContent(TString&& data); -        TContent(const TString& data); +    class TContentBase : protected TString { +    protected: +        TContentBase(TString&& data); +        TContentBase(const TString& data); -        TString Extract(); -        ~TContent(); - -        TContent(TContent&&) = default; -        TContent& operator=(TContent&&) = default; +        TContentBase(TContentBase&&) = default; +        TContentBase& operator=(TContentBase&&) = default; +        ~TContentBase(); +        TString Extract(); +    public:          using TString::size;          using TString::data;          inline operator std::string_view() const { return { data(), size() }; }      private: -        TContent(const TContent&) = delete; -        TContent& operator=(const TContent&) = delete; +        TContentBase(const TContentBase&) = delete; +        TContentBase& operator=(const TContentBase&) = delete; +    }; + +    class TContent : public TContentBase { +    friend class TEasyCurl; +    public: +        TContent(TString&& data, long httpResponseCode = 0LL); +        TContent(const TString& data, long httpResponseCode = 0LL); + +        TContent(TContent&& src) = default; +        TContent& operator=(TContent&& src) = default; +        using TContentBase::Extract;      public:          long HttpResponseCode;      }; @@ -63,7 +72,20 @@ public:          IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()      ) = 0; -    using TOnNewDataPart = std::function<void(TContent&&)>; +    class TCountedContent : public TContentBase { +    public: +        TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter); +        ~TCountedContent(); + +        TCountedContent(TCountedContent&&) = default; +        TCountedContent& operator=(TCountedContent&& src) = default; + +        TString Extract(); +    private: +        const std::shared_ptr<std::atomic_size_t> Counter; +    }; + +    using TOnNewDataPart = std::function<void(TCountedContent&&)>;      using TOnDownloadFinish = std::function<void(std::optional<TIssues>)>;      virtual void Download( diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 6155aa76603..2910c9c2487 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -55,6 +55,7 @@ enum ETokenType {  message THttpGatewayConfig {      optional uint32 MaxInFlightCount = 1;      optional uint64 MaxSimulatenousDownloadsSize = 2; +    optional uint32 BuffersSizePerStream = 3;  }  /////////////////////////////// YT /////////////////////////////// diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 8c9b01ccf32..3c47e5d9632 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -72,6 +72,11 @@ struct TEvPrivate {          const size_t PathIndex;      }; +    struct TEvDataPart : public TEventLocal<TEvDataPart, EvReadResult> { +        TEvDataPart(IHTTPGateway::TCountedContent&& data) : Result(std::move(data)) {} +        IHTTPGateway::TCountedContent Result; +    }; +      struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {};      struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { @@ -281,7 +286,7 @@ private:      class TReadBufferFromStream : public NDB::ReadBuffer {      public:          TReadBufferFromStream(TS3ReadCoroImpl* coro) -            : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro), Value(TString()) +            : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro)          {}      private:          bool nextImpl() final { @@ -294,18 +299,18 @@ private:          }          TS3ReadCoroImpl *const Coro; -        IHTTPGateway::TContent Value; +        TString Value;      };  public:      TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& sourceActorId, const NActors::TActorId& computeActorId, const TReadSpec::TPtr& readSpec)          : TActorCoroImpl(256_KB), InputIndex(inputIndex), ReadSpec(readSpec), SourceActorId(sourceActorId), ComputeActorId(computeActorId)      {} -    bool Next(IHTTPGateway::TContent& value) { +    bool Next(TString& value) {          if (Finished)              return false; -        const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); +        const auto ev = WaitForSpecificEvent<TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();          switch (const auto etype = ev->GetTypeRewrite()) {              case TEvPrivate::TEvReadFinished::EventType:                  Finished = true; @@ -313,8 +318,8 @@ public:              case TEvPrivate::TEvReadError::EventType:                  Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));                  return false; -            case TEvPrivate::TEvReadResult::EventType: -                value = std::move(ev->Get<TEvPrivate::TEvReadResult>()->Result); +            case TEvPrivate::TEvDataPart::EventType: +                value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();                  Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));                  return true;              default: @@ -380,10 +385,10 @@ public:          , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize))      {}  private: -    static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TContent&& data) { +    static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TCountedContent&& data) {          retryStuff->Offset += data.size();          retryStuff->RetryParams.Reset(); -        actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadResult(std::move(data)))); +        actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));      }      static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::optional<TIssues> result) {  | 
