diff options
author | hor911 <hor911@ydb.tech> | 2023-01-20 03:41:54 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-01-20 03:41:54 +0300 |
commit | e1b2f2bb4261482bac275b4c9fbb36775fec2501 (patch) | |
tree | e7982a5e6c25bc23237d5521ebcd5ad0c69e25d3 | |
parent | 899159c396b4ab1dbc5a20f53a110fe8de786c28 (diff) | |
download | ydb-e1b2f2bb4261482bac275b4c9fbb36775fec2501.tar.gz |
S3 Read Inflight - not streaming case
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 116 |
1 files changed, 103 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 601094bd6b..a7ea4000b9 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -241,8 +241,12 @@ public: ui64 startPathIndex, const NActors::TActorId& computeActorId, ui64 sizeLimit, - const IRetryPolicy<long>::TPtr& retryPolicy - ) : Gateway(std::move(gateway)) + const IRetryPolicy<long>::TPtr& retryPolicy, + const TS3ReadActorFactoryConfig& readActorFactoryCfg, + ::NMonitoring::TDynamicCounterPtr counters, + ::NMonitoring::TDynamicCounterPtr taskCounters + ) : ReadActorFactoryCfg(readActorFactoryCfg) + , Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) , TxId(txId) @@ -255,20 +259,57 @@ public: , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) , SizeLimit(sizeLimit) - {} + , Counters(counters) + , TaskCounters(taskCounters) + { + if (Counters) { + QueueDataSize = Counters->GetCounter("QueueDataSize"); + QueueDataLimit = Counters->GetCounter("QueueDataLimit"); + QueueBlockCount = Counters->GetCounter("QueueBlockCount"); + QueueDataLimit->Add(ReadActorFactoryCfg.DataInflight); + } + if (TaskCounters) { + TaskQueueDataSize = TaskCounters->GetCounter("QueueDataSize"); + TaskQueueDataLimit = TaskCounters->GetCounter("QueueDataLimit"); + TaskQueueDataLimit->Add(ReadActorFactoryCfg.DataInflight); + } + } void Bootstrap() { LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex); Become(&TS3ReadActor::StateFunc); - for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { - const TPath& path = Paths[pathInd]; - auto url = Url + std::get<TString>(path); - auto id = pathInd + StartPathIndex; - const TString requestId = CreateGuidAsString(); - LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]"); - Gateway->Download(url, MakeHeaders(Token, requestId), std::min(std::get<size_t>(path), SizeLimit), - std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id), {}, RetryPolicy); - }; + while (TryStartDownload()) { + + } + } + + bool TryStartDownload() { + if (CurrentPathIndex >= Paths.size()) { + // no path is pending + return false; + } + if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) { + // too large data inflight + return false; + } + if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) { + // too large download inflight + return false; + } + + StartDownload(CurrentPathIndex++); + return true; + } + + void StartDownload(size_t index) { + DownloadInflight++; + const TPath& path = Paths[index]; + auto url = Url + std::get<TString>(path); + auto id = index + StartPathIndex; + const TString requestId = CreateGuidAsString(); + LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]"); + Gateway->Download(url, MakeHeaders(Token, requestId), std::min(std::get<size_t>(path), SizeLimit), + std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id), {}, RetryPolicy); } static constexpr char ActorName[] = "S3_READ_ACTOR"; @@ -321,6 +362,16 @@ private: Blocks.pop(); total += size; freeSpace -= size; + + QueueTotalDataSize -= size; + if (Counters) { + QueueDataSize->Sub(size); + QueueBlockCount->Dec(); + } + if (TaskCounters) { + TaskQueueDataSize->Sub(size); + } + TryStartDownload(); } while (!Blocks.empty() && freeSpace > 0LL); } @@ -341,7 +392,18 @@ private: IngressBytes += result->Get()->Result.size(); LOG_D("TS3ReadActor", "ID: " << id << ", Path: " << path << ", read size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode << ", request id: [" << requestId << "]"); if (200 == httpCode || 206 == httpCode) { + auto size = result->Get()->Result.size(); + QueueTotalDataSize += size; + if (Counters) { + QueueBlockCount->Inc(); + QueueDataSize->Add(size); + } + if (TaskCounters) { + TaskQueueDataSize->Add(size); + } Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id)); + DownloadInflight--; + TryStartDownload(); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } else { TString errorText = result->Get()->Result.Extract(); @@ -368,6 +430,18 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor LOG_D("TS3ReadActor", "PassAway"); + + if (Counters) { + QueueDataSize->Sub(QueueTotalDataSize); + QueueBlockCount->Sub(Blocks.size()); + QueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight); + } + if (TaskCounters) { + TaskQueueDataSize->Sub(QueueTotalDataSize); + TaskQueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight); + } + QueueTotalDataSize = 0; + ContainerCache.Clear(); TActorBootstrapped<TS3ReadActor>::PassAway(); } @@ -383,6 +457,7 @@ private: private: size_t IsDoneCounter = 0U; + const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const IHTTPGateway::TPtr Gateway; const THolderFactory& HolderFactory; TPlainContainerCache ContainerCache; @@ -403,6 +478,20 @@ private: ui64 IngressBytes = 0; std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks; + + ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize; + ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataLimit; + ::NMonitoring::TDynamicCounters::TCounterPtr QueueBlockCount; + ::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused; + ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit; + ::NMonitoring::TDynamicCounterPtr Counters; + ::NMonitoring::TDynamicCounterPtr TaskCounters; + ui64 CurrentPathIndex = 0; + ui64 QueueTotalDataSize = 0; + ui64 DownloadInflight = 0; }; struct TReadSpec { @@ -1662,7 +1751,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( sizeLimit = FromString<ui64>(it->second); const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy); + std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy, + cfg, counters, taskCounters); return {actor, actor}; } } |