diff options
| author | hor911 <[email protected]> | 2022-11-21 14:39:53 +0300 | 
|---|---|---|
| committer | hor911 <[email protected]> | 2022-11-21 14:39:53 +0300 | 
| commit | d8296c2cef1913f2580fefe89ef24ec4c84f5c6c (patch) | |
| tree | d943c3dd7241ccabde37540e15c28517a0741399 | |
| parent | 6b13e9638c8fa795d4c1f1e8036eee2beb8517d6 (diff) | |
Limit S3 Read Inflight
4 files changed, 49 insertions, 29 deletions
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto index 337b639a6ce..ab5de69f82d 100644 --- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto +++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto @@ -11,6 +11,7 @@ import "ydb/library/yql/providers/s3/proto/retry_config.proto";  message TS3ReadActorFactoryConfig {      NYql.NS3.TRetryConfig RetryConfig = 1;      uint64 RowsInBatch = 2; // Default = 1000 +    uint64 MaxInflight = 3; // Default = 10  }  message TPqReadActorFactoryConfig { diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index bfc26130148..dd6b1bd0968 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -167,6 +167,9 @@ void Init(          if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {              readActorFactoryCfg.RowsInBatch = rowsInBatch;          } +        if (const ui64 maxInflight = s3readConfig.GetMaxInflight()) { +            readActorFactoryCfg.MaxInflight = maxInflight; +        }          RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());          RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);          RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 0235992e459..8a655ffa300 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -157,14 +157,15 @@ struct TEvPrivate {      };      struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { -        TEvReadFinished(ui64 ingressBytes = 0) -            : IngressBytes(ingressBytes) { +        TEvReadFinished(size_t pathIndex, ui64 ingressBytes = 0) +            : PathIndex(pathIndex), IngressBytes(ingressBytes) {          } -        TEvReadFinished(TIssues&& issues, ui64 ingressBytes = 0) -            : Issues(std::move(issues)), IngressBytes(ingressBytes) { +        TEvReadFinished(size_t pathIndex, TIssues&& issues, ui64 ingressBytes = 0) +            : PathIndex(pathIndex), Issues(std::move(issues)), IngressBytes(ingressBytes) {          } +        const size_t PathIndex;          TIssues Issues;          ui64 IngressBytes;      }; @@ -454,16 +455,16 @@ void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId&      actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));  } -void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, TIssues issues) { -    actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(std::move(issues)))); +void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, TIssues issues) { +    actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, std::move(issues))));  } -void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent) { +void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex) {      retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url,          retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit,          std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1),          std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1), -        std::bind(&OnDownloadFinished, actorSystem, self, parent, std::placeholders::_1)); +        std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1));  }  template <typename T> @@ -594,7 +595,7 @@ public:                  if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) {                      LOG_CORO_D("TS3ReadCoroImpl", "TS3ReadCoroActor" << ": " << SelfActorId << ", TxId: " << RetryStuff->TxId << ". Retry Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); -                    GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId)))); +                    GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex))));                  } else {                      LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]");                      InputFinished = true; @@ -738,7 +739,7 @@ private:          if (issues)              Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));          else -            Send(ParentActorId, new TEvPrivate::TEvReadFinished(IngressBytes)); +            Send(ParentActorId, new TEvPrivate::TEvReadFinished(PathIndex, IngressBytes));      } catch (const TDtorException&) {          return RetryStuff->Cancel();      } catch (const std::exception& err) { @@ -849,20 +850,22 @@ private:  class TS3ReadCoroActor : public TActorCoro {  public: -    TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff) +    TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex)          : TActorCoro(THolder<TActorCoroImpl>(impl.Release()))          , RetryStuff(std::move(retryStuff)) +        , PathIndex(pathIndex)      {}  private:      void Registered(TActorSystem* actorSystem, const TActorId& parent) override {          TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.          if (RetryStuff->Url.substr(0, 6) != "file://") {              LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); -            DownloadStart(RetryStuff, actorSystem, SelfId(), parent); +            DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex);          }      }      const TRetryStuff::TPtr RetryStuff; +    const size_t PathIndex;  };  ui64 GetSizeOfData(const arrow::ArrayData& data) { @@ -931,16 +934,21 @@ public:      void Bootstrap() {          LOG_D("TS3StreamReadActor", "Bootstrap");          Become(&TS3StreamReadActor::StateFunc); -        for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { -            const TPath& path = Paths[pathInd]; -            const TString requestId = CreateGuidAsString(); -            auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), std::get<std::size_t>(path), TxId, requestId, RetryPolicy); -            RetryStuffForFile.push_back(stuff); -            auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg); -            RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release()); +        while (CurrentPathIndex < std::min(ReadActorFactoryCfg.MaxInflight, Paths.size())) { +            RegisterCoro(CurrentPathIndex++);          }      } +    void RegisterCoro(size_t index) { +        const TPath& path = Paths[index]; +        const TString requestId = CreateGuidAsString(); +        auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), std::get<std::size_t>(path), TxId, requestId, RetryPolicy); +        auto pathIndex = index + StartPathIndex; +        RetryStuffForFile.emplace(pathIndex, stuff); +        auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg); +        RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release()); +    }  +      static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR";  private: @@ -1031,8 +1039,8 @@ private:      // IActor & IDqComputeActorAsyncInput      void PassAway() override { // Is called from Compute Actor          LOG_D("TS3StreamReadActor", "PassAway"); -        for (auto stuff: RetryStuffForFile) { -            stuff->Cancel(); +        for (auto pair: RetryStuffForFile) { +            pair.second->Cancel();          }          ContainerCache.Clear();          TActorBootstrapped<TS3StreamReadActor>::PassAway(); @@ -1073,21 +1081,27 @@ private:      void HandleReadFinished(TEvPrivate::TEvReadFinished::TPtr& ev) {          IngressBytes = ev->Get()->IngressBytes; +        RetryStuffForFile.erase(ev->Get()->PathIndex);          Y_VERIFY(Count);          --Count; -        /* -        If an empty range is being downloaded on the last file, -        then we need to pass the information to Compute Actor that -        the download of all data is finished in this place -        */ -        if (Blocks.empty() && Count == 0) { -            Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + +        if (CurrentPathIndex < Paths.size()) { +            RegisterCoro(CurrentPathIndex++); +        } else { +            /* +            If an empty range is being downloaded on the last file, +            then we need to pass the information to Compute Actor that +            the download of all data is finished in this place +            */ +            if (Blocks.empty() && Count == 0) { +                Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); +            }          }      }      const TS3ReadActorFactoryConfig ReadActorFactoryCfg;      const IHTTPGateway::TPtr Gateway; -    std::vector<TRetryStuff::TPtr> RetryStuffForFile; +    THashMap<size_t, TRetryStuff::TPtr> RetryStuffForFile;      const THolderFactory& HolderFactory;      TPlainContainerCache ContainerCache;      TPlainContainerCache ArrowTupleContainerCache; @@ -1108,6 +1122,7 @@ private:      ui32 Count;      const std::size_t MaxBlocksInFly;      ui64 IngressBytes = 0; +    size_t CurrentPathIndex = 0;  };  using namespace NKikimr::NMiniKQL; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 7ee8b1d4c59..bf6c6c7eab5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -13,6 +13,7 @@ namespace NYql::NDq {  struct TS3ReadActorFactoryConfig {      ui64 RowsInBatch = 1000; +    ui64 MaxInflight = 1000;  };  void RegisterS3ReadActorFactory(  | 
