aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-01-20 03:41:54 +0300
committerhor911 <hor911@ydb.tech>2023-01-20 03:41:54 +0300
commite1b2f2bb4261482bac275b4c9fbb36775fec2501 (patch)
treee7982a5e6c25bc23237d5521ebcd5ad0c69e25d3
parent899159c396b4ab1dbc5a20f53a110fe8de786c28 (diff)
downloadydb-e1b2f2bb4261482bac275b4c9fbb36775fec2501.tar.gz
S3 Read Inflight - not streaming case
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp116
1 files changed, 103 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 601094bd6b..a7ea4000b9 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -241,8 +241,12 @@ public:
ui64 startPathIndex,
const NActors::TActorId& computeActorId,
ui64 sizeLimit,
- const IRetryPolicy<long>::TPtr& retryPolicy
- ) : Gateway(std::move(gateway))
+ const IRetryPolicy<long>::TPtr& retryPolicy,
+ const TS3ReadActorFactoryConfig& readActorFactoryCfg,
+ ::NMonitoring::TDynamicCounterPtr counters,
+ ::NMonitoring::TDynamicCounterPtr taskCounters
+ ) : ReadActorFactoryCfg(readActorFactoryCfg)
+ , Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
, TxId(txId)
@@ -255,20 +259,57 @@ public:
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
, SizeLimit(sizeLimit)
- {}
+ , Counters(counters)
+ , TaskCounters(taskCounters)
+ {
+ if (Counters) {
+ QueueDataSize = Counters->GetCounter("QueueDataSize");
+ QueueDataLimit = Counters->GetCounter("QueueDataLimit");
+ QueueBlockCount = Counters->GetCounter("QueueBlockCount");
+ QueueDataLimit->Add(ReadActorFactoryCfg.DataInflight);
+ }
+ if (TaskCounters) {
+ TaskQueueDataSize = TaskCounters->GetCounter("QueueDataSize");
+ TaskQueueDataLimit = TaskCounters->GetCounter("QueueDataLimit");
+ TaskQueueDataLimit->Add(ReadActorFactoryCfg.DataInflight);
+ }
+ }
void Bootstrap() {
LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex);
Become(&TS3ReadActor::StateFunc);
- for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
- const TPath& path = Paths[pathInd];
- auto url = Url + std::get<TString>(path);
- auto id = pathInd + StartPathIndex;
- const TString requestId = CreateGuidAsString();
- LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]");
- Gateway->Download(url, MakeHeaders(Token, requestId), std::min(std::get<size_t>(path), SizeLimit),
- std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id), {}, RetryPolicy);
- };
+ while (TryStartDownload()) {
+
+ }
+ }
+
+ bool TryStartDownload() {
+ if (CurrentPathIndex >= Paths.size()) {
+ // no path is pending
+ return false;
+ }
+ if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) {
+ // too large data inflight
+ return false;
+ }
+ if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) {
+ // too large download inflight
+ return false;
+ }
+
+ StartDownload(CurrentPathIndex++);
+ return true;
+ }
+
+ void StartDownload(size_t index) {
+ DownloadInflight++;
+ const TPath& path = Paths[index];
+ auto url = Url + std::get<TString>(path);
+ auto id = index + StartPathIndex;
+ const TString requestId = CreateGuidAsString();
+ LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]");
+ Gateway->Download(url, MakeHeaders(Token, requestId), std::min(std::get<size_t>(path), SizeLimit),
+ std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id), {}, RetryPolicy);
}
static constexpr char ActorName[] = "S3_READ_ACTOR";
@@ -321,6 +362,16 @@ private:
Blocks.pop();
total += size;
freeSpace -= size;
+
+ QueueTotalDataSize -= size;
+ if (Counters) {
+ QueueDataSize->Sub(size);
+ QueueBlockCount->Dec();
+ }
+ if (TaskCounters) {
+ TaskQueueDataSize->Sub(size);
+ }
+ TryStartDownload();
} while (!Blocks.empty() && freeSpace > 0LL);
}
@@ -341,7 +392,18 @@ private:
IngressBytes += result->Get()->Result.size();
LOG_D("TS3ReadActor", "ID: " << id << ", Path: " << path << ", read size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode << ", request id: [" << requestId << "]");
if (200 == httpCode || 206 == httpCode) {
+ auto size = result->Get()->Result.size();
+ QueueTotalDataSize += size;
+ if (Counters) {
+ QueueBlockCount->Inc();
+ QueueDataSize->Add(size);
+ }
+ if (TaskCounters) {
+ TaskQueueDataSize->Add(size);
+ }
Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id));
+ DownloadInflight--;
+ TryStartDownload();
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
} else {
TString errorText = result->Get()->Result.Extract();
@@ -368,6 +430,18 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
LOG_D("TS3ReadActor", "PassAway");
+
+ if (Counters) {
+ QueueDataSize->Sub(QueueTotalDataSize);
+ QueueBlockCount->Sub(Blocks.size());
+ QueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight);
+ }
+ if (TaskCounters) {
+ TaskQueueDataSize->Sub(QueueTotalDataSize);
+ TaskQueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight);
+ }
+ QueueTotalDataSize = 0;
+
ContainerCache.Clear();
TActorBootstrapped<TS3ReadActor>::PassAway();
}
@@ -383,6 +457,7 @@ private:
private:
size_t IsDoneCounter = 0U;
+ const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const IHTTPGateway::TPtr Gateway;
const THolderFactory& HolderFactory;
TPlainContainerCache ContainerCache;
@@ -403,6 +478,20 @@ private:
ui64 IngressBytes = 0;
std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
+
+ ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataLimit;
+ ::NMonitoring::TDynamicCounters::TCounterPtr QueueBlockCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused;
+ ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit;
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ ::NMonitoring::TDynamicCounterPtr TaskCounters;
+ ui64 CurrentPathIndex = 0;
+ ui64 QueueTotalDataSize = 0;
+ ui64 DownloadInflight = 0;
};
struct TReadSpec {
@@ -1662,7 +1751,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
sizeLimit = FromString<ui64>(it->second);
const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy);
+ std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy,
+ cfg, counters, taskCounters);
return {actor, actor};
}
}