diff options
author | hor911 <hor911@ydb.tech> | 2022-11-21 14:39:53 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-11-21 14:39:53 +0300 |
commit | d8296c2cef1913f2580fefe89ef24ec4c84f5c6c (patch) | |
tree | d943c3dd7241ccabde37540e15c28517a0741399 | |
parent | 6b13e9638c8fa795d4c1f1e8036eee2beb8517d6 (diff) | |
download | ydb-d8296c2cef1913f2580fefe89ef24ec4c84f5c6c.tar.gz |
Limit S3 Read Inflight
4 files changed, 49 insertions, 29 deletions
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto index 337b639a6ce..ab5de69f82d 100644 --- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto +++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto @@ -11,6 +11,7 @@ import "ydb/library/yql/providers/s3/proto/retry_config.proto"; message TS3ReadActorFactoryConfig { NYql.NS3.TRetryConfig RetryConfig = 1; uint64 RowsInBatch = 2; // Default = 1000 + uint64 MaxInflight = 3; // Default = 10 } message TPqReadActorFactoryConfig { diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index bfc26130148..dd6b1bd0968 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -167,6 +167,9 @@ void Init( if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) { readActorFactoryCfg.RowsInBatch = rowsInBatch; } + if (const ui64 maxInflight = s3readConfig.GetMaxInflight()) { + readActorFactoryCfg.MaxInflight = maxInflight; + } RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 0235992e459..8a655ffa300 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -157,14 +157,15 @@ struct TEvPrivate { }; struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { - TEvReadFinished(ui64 ingressBytes = 0) - : IngressBytes(ingressBytes) { + TEvReadFinished(size_t pathIndex, ui64 ingressBytes = 0) + : PathIndex(pathIndex), IngressBytes(ingressBytes) { } - TEvReadFinished(TIssues&& issues, ui64 ingressBytes = 0) - : Issues(std::move(issues)), IngressBytes(ingressBytes) { + TEvReadFinished(size_t pathIndex, TIssues&& issues, ui64 ingressBytes = 0) + : PathIndex(pathIndex), Issues(std::move(issues)), IngressBytes(ingressBytes) { } + const size_t PathIndex; TIssues Issues; ui64 IngressBytes; }; @@ -454,16 +455,16 @@ void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data)))); } -void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, TIssues issues) { - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(std::move(issues)))); +void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, TIssues issues) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, std::move(issues)))); } -void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent) { +void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex) { retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url, retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit, std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1), std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1), - std::bind(&OnDownloadFinished, actorSystem, self, parent, std::placeholders::_1)); + std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1)); } template <typename T> @@ -594,7 +595,7 @@ public: if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) { LOG_CORO_D("TS3ReadCoroImpl", "TS3ReadCoroActor" << ": " << SelfActorId << ", TxId: " << RetryStuff->TxId << ". Retry Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); - GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId)))); + GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex)))); } else { LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]"); InputFinished = true; @@ -738,7 +739,7 @@ private: if (issues) Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); else - Send(ParentActorId, new TEvPrivate::TEvReadFinished(IngressBytes)); + Send(ParentActorId, new TEvPrivate::TEvReadFinished(PathIndex, IngressBytes)); } catch (const TDtorException&) { return RetryStuff->Cancel(); } catch (const std::exception& err) { @@ -849,20 +850,22 @@ private: class TS3ReadCoroActor : public TActorCoro { public: - TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff) + TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex) : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) , RetryStuff(std::move(retryStuff)) + , PathIndex(pathIndex) {} private: void Registered(TActorSystem* actorSystem, const TActorId& parent) override { TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself. if (RetryStuff->Url.substr(0, 6) != "file://") { LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); - DownloadStart(RetryStuff, actorSystem, SelfId(), parent); + DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex); } } const TRetryStuff::TPtr RetryStuff; + const size_t PathIndex; }; ui64 GetSizeOfData(const arrow::ArrayData& data) { @@ -931,16 +934,21 @@ public: void Bootstrap() { LOG_D("TS3StreamReadActor", "Bootstrap"); Become(&TS3StreamReadActor::StateFunc); - for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { - const TPath& path = Paths[pathInd]; - const TString requestId = CreateGuidAsString(); - auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), std::get<std::size_t>(path), TxId, requestId, RetryPolicy); - RetryStuffForFile.push_back(stuff); - auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg); - RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release()); + while (CurrentPathIndex < std::min(ReadActorFactoryCfg.MaxInflight, Paths.size())) { + RegisterCoro(CurrentPathIndex++); } } + void RegisterCoro(size_t index) { + const TPath& path = Paths[index]; + const TString requestId = CreateGuidAsString(); + auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), std::get<std::size_t>(path), TxId, requestId, RetryPolicy); + auto pathIndex = index + StartPathIndex; + RetryStuffForFile.emplace(pathIndex, stuff); + auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg); + RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release()); + } + static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR"; private: @@ -1031,8 +1039,8 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor LOG_D("TS3StreamReadActor", "PassAway"); - for (auto stuff: RetryStuffForFile) { - stuff->Cancel(); + for (auto pair: RetryStuffForFile) { + pair.second->Cancel(); } ContainerCache.Clear(); TActorBootstrapped<TS3StreamReadActor>::PassAway(); @@ -1073,21 +1081,27 @@ private: void HandleReadFinished(TEvPrivate::TEvReadFinished::TPtr& ev) { IngressBytes = ev->Get()->IngressBytes; + RetryStuffForFile.erase(ev->Get()->PathIndex); Y_VERIFY(Count); --Count; - /* - If an empty range is being downloaded on the last file, - then we need to pass the information to Compute Actor that - the download of all data is finished in this place - */ - if (Blocks.empty() && Count == 0) { - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + + if (CurrentPathIndex < Paths.size()) { + RegisterCoro(CurrentPathIndex++); + } else { + /* + If an empty range is being downloaded on the last file, + then we need to pass the information to Compute Actor that + the download of all data is finished in this place + */ + if (Blocks.empty() && Count == 0) { + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + } } } const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const IHTTPGateway::TPtr Gateway; - std::vector<TRetryStuff::TPtr> RetryStuffForFile; + THashMap<size_t, TRetryStuff::TPtr> RetryStuffForFile; const THolderFactory& HolderFactory; TPlainContainerCache ContainerCache; TPlainContainerCache ArrowTupleContainerCache; @@ -1108,6 +1122,7 @@ private: ui32 Count; const std::size_t MaxBlocksInFly; ui64 IngressBytes = 0; + size_t CurrentPathIndex = 0; }; using namespace NKikimr::NMiniKQL; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 7ee8b1d4c59..bf6c6c7eab5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -13,6 +13,7 @@ namespace NYql::NDq { struct TS3ReadActorFactoryConfig { ui64 RowsInBatch = 1000; + ui64 MaxInflight = 1000; }; void RegisterS3ReadActorFactory( |